http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java new file mode 100644 index 0000000..a0ed26e --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.io.File; +import java.lang.reflect.Field; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; +import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction; +import org.apache.asterix.app.nc.NCAppRuntimeContext; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.IndexCheckpoint; +import org.apache.asterix.common.storage.ResourceReference; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable; +import org.apache.asterix.test.common.TestHelper; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.util.SingleThreadEventProcessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.datagen.TupleGenerator; +import org.apache.hyracks.storage.am.common.datagen.IFieldValueGenerator; +import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class CheckpointInSecondaryIndexTest { + static final int REPREAT_TEST_COUNT = 1; + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]); + } + + private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; + private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, + new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); + private static final GenerationFunction[] RECORD_GEN_FUNCTION = + { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; + private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; + private static final ARecordType META_TYPE = null; + private static final GenerationFunction[] META_GEN_FUNCTION = null; + private static final boolean[] UNIQUE_META_FIELDS = null; + private static final int[] KEY_INDEXES = { 0 }; + private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR }; + private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR }); + private static final int RECORDS_PER_COMPONENT = 500; + private static final int DATASET_ID = 101; + private static final String DATAVERSE_NAME = "TestDV"; + private static final String DATASET_NAME = "TestDS"; + private static final String INDEX_NAME = "TestIdx"; + private static final String DATA_TYPE_NAME = "DUMMY"; + private static final String NODE_GROUP_NAME = "DEFAULT"; + private static final IndexType INDEX_TYPE = IndexType.BTREE; + private static final IFieldValueGenerator[] SECONDARY_INDEX_VALUE_GENERATOR = + { new AInt64ValueGenerator(), new AInt32ValueGenerator() }; + private static final List<List<String>> INDEX_FIELD_NAMES = + Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1])); + private static final List<Integer> INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR); + private static final List<IAType> INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64); + private static final StorageComponentProvider storageManager = new StorageComponentProvider(); + private static TestNodeController nc; + private static NCAppRuntimeContext ncAppCtx; + private static IDatasetLifecycleManager dsLifecycleMgr; + private static Dataset dataset; + private static Index secondaryIndex; + private static ITransactionContext txnCtx; + private static TestLsmBtree primaryLsmBtree; + private static TestLsmBtree secondaryLsmBtree; + private static PrimaryIndexInfo primaryIndexInfo; + private static IHyracksTaskContext taskCtx; + private static IIndexDataflowHelper primaryIndexDataflowHelper; + private static IIndexDataflowHelper secondaryIndexDataflowHelper; + private static LSMInsertDeleteOperatorNodePushable insertOp; + private static LSMIndexBulkLoadOperatorNodePushable indexLoadOp; + private static IHyracksTaskContext loadTaskCtx; + private static SecondaryIndexInfo secondaryIndexInfo; + private static Actor actor; + + @BeforeClass + public static void setUp() throws Exception { + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + + File.separator + "resources" + File.separator + "cc-multipart.conf"; + nc = new TestNodeController(configPath, false); + nc.init(); + ncAppCtx = nc.getAppRuntimeContext(); + dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager(); + } + + @AfterClass + public static void tearDown() throws Exception { + System.out.println("TearDown"); + nc.deInit(); + TestHelper.deleteExistingInstanceFiles(); + } + + @Before + public void createIndex() throws Exception { + List<List<String>> partitioningKeys = new ArrayList<>(); + partitioningKeys.add(Collections.singletonList("key")); + dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, + NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, + PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null), + null, DatasetType.INTERNAL, DATASET_ID, 0); + secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES, + INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0); + taskCtx = null; + primaryIndexDataflowHelper = null; + secondaryIndexDataflowHelper = null; + primaryLsmBtree = null; + insertOp = null; + JobId jobId = nc.newJobId(); + txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId), + new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); + actor = null; + taskCtx = nc.createTestContext(jobId, 0, false); + primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, + KEY_INDEXES, KEY_INDICATORS_LIST, 0); + IndexDataflowHelperFactory iHelperFactory = + new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); + primaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0); + primaryIndexDataflowHelper.open(); + primaryLsmBtree = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance(); + primaryIndexDataflowHelper.close(); + // This pipeline skips the secondary index + insertOp = nc.getInsertPipeline(taskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, + KEY_INDICATORS_LIST, storageManager, null).getLeft(); + actor = new Actor("player"); + // allow all operations + StorageTestUtils.allowAllOps(primaryLsmBtree); + actor.add(new Request(Request.Action.INSERT_OPEN)); + } + + @After + public void destroyIndex() throws Exception { + Request close = new Request(Request.Action.INSERT_CLOSE); + actor.add(close); + close.await(); + nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); + if (secondaryIndexDataflowHelper != null) { + secondaryIndexDataflowHelper.destroy(); + } + primaryIndexDataflowHelper.destroy(); + actor.stop(); + } + + @Test + public void testCheckpointUpdatedWhenSecondaryIsEmpty() throws Exception { + try { + // create secondary + createSecondaryIndex(); + actor.add(new Request(Request.Action.INSERT_PATCH)); + ensureDone(actor); + // search now and ensure partition 0 has all the records + StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT); + // and that secondary index is empty + Assert.assertTrue(secondaryLsmBtree.isCurrentMutableComponentEmpty()); + // flush + actor.add(new Request(Request.Action.FLUSH_DATASET)); + ensureDone(actor); + // ensure primary has a component + Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size()); + + // ensure secondary doesn't have a component + Assert.assertEquals(0, secondaryLsmBtree.getDiskComponents().size()); + // ensure that current memory component index match + Assert.assertEquals(secondaryLsmBtree.getCurrentMemoryComponentIndex(), + primaryLsmBtree.getCurrentMemoryComponentIndex()); + // ensure both checkpoint files has the same component id as the last flushed component id + ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0); + LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId(); + long min = id.getMinId(); + // primary ref + Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class + fileManagerField.setAccessible(true); //Make it accessible so you can access it + ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree); + final ResourceReference primaryRef = ResourceReference + .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef); + // secondary ref + ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree); + final ResourceReference secondaryRef = ResourceReference.of( + secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef); + IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + private void createSecondaryIndex() + throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + SecondaryIndexInfo secondaryIndexInfo = + nc.createSecondaryIndex(primaryIndexInfo, secondaryIndex, storageManager, 0); + IndexDataflowHelperFactory iHelperFactory = + new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider()); + secondaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0); + secondaryIndexDataflowHelper.open(); + secondaryLsmBtree = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance(); + secondaryIndexDataflowHelper.close(); + } + + @Test + public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsSingleComponent() throws Exception { + try { + // create secondary + actor.add(new Request(Request.Action.INSERT_PATCH)); + ensureDone(actor); + // search now and ensure partition 0 has all the records + StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT); + // flush + actor.add(new Request(Request.Action.FLUSH_DATASET)); + ensureDone(actor); + // ensure primary has a component + Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size()); + // ensure both checkpoint files has the same component id as the last flushed component id + ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0); + LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId(); + long min = id.getMinId(); + // primary ref + Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class + fileManagerField.setAccessible(true); //Make it accessible so you can access it + ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree); + final ResourceReference primaryRef = ResourceReference + .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef); + IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + createSecondaryIndex(); + JobId jobId = nc.newJobId(); + loadTaskCtx = nc.createTestContext(jobId, 0, false); + Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp = + nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT); + indexLoadOp = infoAndOp.getRight(); + secondaryIndexInfo = infoAndOp.getLeft(); + actor.add(new Request(Request.Action.LOAD_OPEN)); + actor.add(new Request(Request.Action.INDEX_LOAD_PATCH)); + actor.add(new Request(Request.Action.LOAD_CLOSE)); + ensureDone(actor); + latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree); + final ResourceReference secondaryRef = ResourceReference.of( + secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef); + IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest(); + Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + @Test + public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsTwoComponents() throws Exception { + try { + // create secondary + actor.add(new Request(Request.Action.INSERT_PATCH)); + ensureDone(actor); + // search now and ensure partition 0 has all the records + StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT); + // flush + actor.add(new Request(Request.Action.FLUSH_DATASET)); + ensureDone(actor); + // ensure primary has a component + Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size()); + // ensure both checkpoint files has the same component id as the last flushed component id + ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0); + LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId(); + long min = id.getMinId(); + // primary ref + Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class + fileManagerField.setAccessible(true); //Make it accessible so you can access it + ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree); + final ResourceReference primaryRef = ResourceReference + .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef); + IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + actor.add(new Request(Request.Action.INSERT_PATCH)); + ensureDone(actor); + actor.add(new Request(Request.Action.FLUSH_DATASET)); + ensureDone(actor); + Assert.assertEquals(2, primaryLsmBtree.getDiskComponents().size()); + // ensure both checkpoint files has the same component id as the last flushed component id + primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0); + id = (LSMComponentId) primaryDiskComponent.getId(); + min = id.getMaxId(); + createSecondaryIndex(); + JobId jobId = nc.newJobId(); + loadTaskCtx = nc.createTestContext(jobId, 0, false); + Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp = + nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT); + indexLoadOp = infoAndOp.getRight(); + secondaryIndexInfo = infoAndOp.getLeft(); + actor.add(new Request(Request.Action.LOAD_OPEN)); + actor.add(new Request(Request.Action.INDEX_LOAD_PATCH)); + actor.add(new Request(Request.Action.LOAD_CLOSE)); + ensureDone(actor); + latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree); + final ResourceReference secondaryRef = ResourceReference.of( + secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef); + IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest(); + Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + @Test + public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsEmpty() throws Exception { + try { + // ensure primary has no component + Assert.assertEquals(0, primaryLsmBtree.getDiskComponents().size()); + // primary ref + Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class + fileManagerField.setAccessible(true); //Make it accessible so you can access it + ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree); + final ResourceReference primaryRef = ResourceReference + .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef); + IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + createSecondaryIndex(); + JobId jobId = nc.newJobId(); + loadTaskCtx = nc.createTestContext(jobId, 0, false); + Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp = + nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT); + indexLoadOp = infoAndOp.getRight(); + secondaryIndexInfo = infoAndOp.getLeft(); + actor.add(new Request(Request.Action.LOAD_OPEN)); + actor.add(new Request(Request.Action.LOAD_CLOSE)); + ensureDone(actor); + latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree); + final ResourceReference secondaryRef = ResourceReference.of( + secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef); + IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest(); + Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), + latestPrimaryCheckpoint.getLastComponentId()); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + @Test + public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsNotEmpty() throws Exception { + try { + // create secondary + actor.add(new Request(Request.Action.INSERT_PATCH)); + ensureDone(actor); + // search now and ensure partition 0 has all the records + StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT); + // flush + actor.add(new Request(Request.Action.FLUSH_DATASET)); + ensureDone(actor); + // ensure primary has a component + Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size()); + // ensure both checkpoint files has the same component id as the last flushed component id + ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0); + LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId(); + long min = id.getMinId(); + // primary ref + Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class + fileManagerField.setAccessible(true); //Make it accessible so you can access it + ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree); + final ResourceReference primaryRef = ResourceReference + .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef); + IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + createSecondaryIndex(); + JobId jobId = nc.newJobId(); + loadTaskCtx = nc.createTestContext(jobId, 0, false); + Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp = + nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT); + indexLoadOp = infoAndOp.getRight(); + secondaryIndexInfo = infoAndOp.getLeft(); + actor.add(new Request(Request.Action.LOAD_OPEN)); + actor.add(new Request(Request.Action.LOAD_CLOSE)); + ensureDone(actor); + latestPrimaryCheckpoint = primaryCheckpointManager.getLatest(); + Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min); + ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree); + final ResourceReference secondaryRef = ResourceReference.of( + secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath()); + IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef); + IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest(); + Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + + protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() { + return ncAppCtx.getIndexCheckpointManagerProvider(); + } + + private void ensureDone(Actor actor) throws InterruptedException { + Request req = new Request(Request.Action.DUMMY); + actor.add(req); + req.await(); + } + + private static class Request { + enum Action { + DUMMY, + INSERT_OPEN, + LOAD_OPEN, + INSERT_PATCH, + INDEX_LOAD_PATCH, + FLUSH_DATASET, + INSERT_CLOSE, + LOAD_CLOSE, + } + + private final Action action; + private volatile boolean done; + + public Request(Action action) { + this.action = action; + done = false; + } + + synchronized void complete() { + done = true; + notifyAll(); + } + + synchronized void await() throws InterruptedException { + while (!done) { + wait(); + } + } + } + + public class Actor extends SingleThreadEventProcessor<Request> { + private final RecordTupleGenerator primaryInsertTupleGenerator; + private final FrameTupleAppender tupleAppender; + + public Actor(String name) throws HyracksDataException { + super(name); + primaryInsertTupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, + RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + tupleAppender = new FrameTupleAppender(new VSizeFrame(taskCtx)); + } + + @Override + protected void handle(Request req) throws Exception { + try { + switch (req.action) { + case FLUSH_DATASET: + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false); + break; + case INSERT_CLOSE: + insertOp.close(); + break; + case INSERT_OPEN: + insertOp.open(); + break; + case LOAD_OPEN: + indexLoadOp.open(); + break; + case LOAD_CLOSE: + indexLoadOp.close(); + break; + case INSERT_PATCH: + for (int j = 0; j < RECORDS_PER_COMPONENT; j++) { + ITupleReference tuple = primaryInsertTupleGenerator.next(); + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + } + if (tupleAppender.getTupleCount() > 0) { + tupleAppender.write(insertOp, true); + } + StorageTestUtils.waitForOperations(primaryLsmBtree); + break; + case INDEX_LOAD_PATCH: + TupleGenerator secondaryLoadTupleGenerator = + new TupleGenerator(SECONDARY_INDEX_VALUE_GENERATOR, secondaryIndexInfo.getSerdes(), 0); + FrameTupleAppender secondaryTupleAppender = new FrameTupleAppender(new VSizeFrame(loadTaskCtx)); + for (int j = 0; j < RECORDS_PER_COMPONENT; j++) { + ITupleReference tuple = secondaryLoadTupleGenerator.next(); + DataflowUtils.addTupleToFrame(secondaryTupleAppender, tuple, indexLoadOp); + } + if (secondaryTupleAppender.getTupleCount() > 0) { + secondaryTupleAppender.write(indexLoadOp, true); + } + break; + default: + break; + } + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } finally { + req.complete(); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java index a33bda1..017c59f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java @@ -27,7 +27,7 @@ import java.util.function.Predicate; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; -import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; @@ -122,7 +122,7 @@ public class ComponentRollbackTest { // allow all operations StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -190,7 +190,7 @@ public class ComponentRollbackTest { // allow all operations StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -272,7 +272,7 @@ public class ComponentRollbackTest { StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearSearchCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -356,7 +356,7 @@ public class ComponentRollbackTest { // allow all operations StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -411,7 +411,7 @@ public class ComponentRollbackTest { StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearMergeCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -474,7 +474,7 @@ public class ComponentRollbackTest { // allow all operations StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -533,7 +533,7 @@ public class ComponentRollbackTest { // allow all operations StorageTestUtils.allowAllOps(lsmBtree); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -594,7 +594,7 @@ public class ComponentRollbackTest { StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearMergeCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { @@ -664,7 +664,7 @@ public class ComponentRollbackTest { StorageTestUtils.allowAllOps(lsmBtree); lsmBtree.clearMergeCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java index eb16cf4..b618727 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java @@ -22,7 +22,7 @@ import java.io.File; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; -import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; @@ -118,7 +118,7 @@ public class IoCallbackFailureTest { throws Exception { NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext(); IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager(); - TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); + RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator(); ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); boolean failed = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java index e4623fd..79e6368 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java @@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo; -import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -92,7 +92,7 @@ public class LSMFlushRecoveryTest { private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers; private static ITransactionContext txnCtx; private static LSMInsertDeleteOperatorNodePushable[] insertOps; - private static TupleGenerator tupleGenerator; + private static RecordTupleGenerator tupleGenerator; private static final int NUM_PARTITIONS = 2; private static final int PARTITION_0 = 0; @@ -478,6 +478,8 @@ public class LSMFlushRecoveryTest { ILSMMemoryComponent primaryMemComponent = primaryIndexes[partitionIndex].getCurrentMemoryComponent(); ILSMMemoryComponent secondaryMemComponent = secondaryIndexes[partitionIndex].getCurrentMemoryComponent(); Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId()); + Assert.assertEquals(primaryIndexes[partitionIndex].getCurrentMemoryComponentIndex(), + secondaryIndexes[partitionIndex].getCurrentMemoryComponentIndex()); } List<ILSMDiskComponent> primaryDiskComponents = primaryIndexes[partitionIndex].getDiskComponents(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index 2121327..e2c99b0 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -26,10 +26,9 @@ import java.util.List; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction; import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter; -import org.apache.asterix.app.data.gen.TupleGenerator; -import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionContext; @@ -37,10 +36,7 @@ import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.file.StorageComponentProvider; -import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; @@ -113,22 +109,20 @@ public class LogMarkerTest { StorageComponentProvider storageManager = new StorageComponentProvider(); List<List<String>> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, - NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null), - null, DatasetType.INTERNAL, DATASET_ID, 0); try { - PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, - storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0); + PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, + META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0); JobId jobId = nc.newJobId(); IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true); ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); - LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, - RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft(); + LSMInsertDeleteOperatorNodePushable insertOp = + nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + RecordTupleGenerator tupleGenerator = + new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, + RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); VSizeFrame frame = new VSizeFrame(ctx); VSizeFrame marker = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); @@ -178,9 +172,9 @@ public class LogMarkerTest { nc.newJobId(); TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE), Collections.emptyList(), Collections.emptyList(), false); - IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, - META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, - storageManager); + IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, StorageTestUtils.DATASET, KEY_TYPES, + RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, + KEY_INDICATORS_LIST, storageManager); emptyTupleOp.open(); emptyTupleOp.close(); Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java index 1795c93..a7225a1 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java @@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo; -import org.apache.asterix.app.data.gen.TupleGenerator; -import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; @@ -619,14 +619,14 @@ public class MultiPartitionLSMIndexTest { public class Actor extends SingleThreadEventProcessor<Request> { private final int partition; - private final TupleGenerator tupleGenerator; + private final RecordTupleGenerator tupleGenerator; private final VSizeFrame frame; private final FrameTupleAppender tupleAppender; public Actor(String name, int partition) throws HyracksDataException { super(name); this.partition = partition; - tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, + tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); frame = new VSizeFrame(taskCtxs[partition]); tupleAppender = new FrameTupleAppender(frame); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java index 72026a2..61c1fb2 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java @@ -26,8 +26,8 @@ import java.util.List; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter; -import org.apache.asterix.app.data.gen.TupleGenerator; -import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; @@ -161,8 +161,8 @@ public class SearchCursorComponentSwitchTest { // except search lsmBtree.clearSearchCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, + KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); Searcher firstSearcher = null; @@ -207,8 +207,8 @@ public class SearchCursorComponentSwitchTest { // except search lsmBtree.clearSearchCallbacks(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, + KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); Searcher firstSearcher = null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java index d08fc72..589e8b2 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java @@ -33,9 +33,9 @@ import java.util.concurrent.TimeUnit; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction; import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter; -import org.apache.asterix.app.data.gen.TupleGenerator; -import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.context.DatasetInfo; @@ -100,7 +100,7 @@ public class StorageTestUtils { private StorageTestUtils() { } - static void allowAllOps(TestLsmBtree lsmBtree) { + public static void allowAllOps(TestLsmBtree lsmBtree) { lsmBtree.clearModifyCallbacks(); lsmBtree.clearFlushCallbacks(); lsmBtree.clearSearchCallbacks(); @@ -118,6 +118,12 @@ public class StorageTestUtils { KEY_INDICATORS_LIST, partition); } + public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, Dataset dataset, int partition) + throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + return nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES, + KEY_INDICATORS_LIST, partition); + } + public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { return getInsertPipeline(nc, ctx, null); @@ -131,13 +137,27 @@ public class StorageTestUtils { } public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx, + Dataset dataset, Index secondaryIndex, IndexOperation op) + throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, + KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft(); + } + + public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx, Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft(); } - public static TupleGenerator getTupleGenerator() { - return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION, + public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx, + Dataset dataset, Index secondaryIndex) + throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { + return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, + KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft(); + } + + public static RecordTupleGenerator getTupleGenerator() { + return new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); } @@ -146,6 +166,11 @@ public class StorageTestUtils { searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords); } + public static void searchAndAssertCount(TestNodeController nc, Dataset dataset, int partition, int numOfRecords) + throws HyracksDataException, AlgebricksException { + searchAndAssertCount(nc, partition, dataset, STORAGE_MANAGER, numOfRecords); + } + public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager, int numOfRecords) throws HyracksDataException, AlgebricksException { @@ -182,6 +207,11 @@ public class StorageTestUtils { flushPartition(dslLifecycleMgr, lsmBtree, DATASET, async); } + public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree, + boolean async) throws Exception { + flushPartition(dslLifecycleMgr, lsmBtree, dataset, async); + } + public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset, boolean async) throws Exception { waitForOperations(lsmBtree); @@ -211,6 +241,11 @@ public class StorageTestUtils { flush(dsLifecycleMgr, lsmBtree, DATASET, async); } + public static void flush(IDatasetLifecycleManager dsLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree, + boolean async) throws Exception { + flush(dsLifecycleMgr, lsmBtree, dataset, async); + } + public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset, boolean async) throws Exception { waitForOperations(lsmBtree); @@ -240,6 +275,11 @@ public class StorageTestUtils { this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords); } + public Searcher(TestNodeController nc, Dataset dataset, int partition, TestLsmBtree lsmBtree, + int numOfRecords) { + this(nc, partition, dataset, STORAGE_MANAGER, lsmBtree, numOfRecords); + } + public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager, TestLsmBtree lsmBtree, int numOfRecords) { lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java index 20875a3..bcf68b5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java @@ -21,6 +21,7 @@ package org.apache.asterix.test.dataflow; import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -28,9 +29,14 @@ import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory; +import org.apache.asterix.transaction.management.runtime.CommitRuntime; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.common.IResourceFactory; @@ -48,6 +54,19 @@ public class TestDataset extends Dataset { } @Override + public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, + int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException { + return new IPushRuntimeFactory() { + @Override + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()), + getDatasetId(), primaryKeyFieldPermutation, true, + ctx.getTaskAttemptId().getTaskId().getPartition(), true) }; + } + }; + } + + @Override public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties) throws AlgebricksException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java index 7a3e475..bee2f8d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.Semaphore; import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo; -import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; @@ -68,7 +68,7 @@ public class TransactionAbortTest { private static IHyracksTaskContext abortCtx; private static ITransactionContext abortTxnCtx; private static LSMInsertDeleteOperatorNodePushable abortOp; - private static TupleGenerator tupleGenerator; + private static RecordTupleGenerator tupleGenerator; @Rule public TestRule watcher = new TestMethodTracer(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 418282e..3634bf1 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -18,6 +18,10 @@ */ package org.apache.asterix.test.logging; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + import java.io.File; import java.nio.file.Path; import java.util.ArrayList; @@ -26,10 +30,9 @@ import java.util.Collections; import java.util.List; import org.apache.asterix.app.bootstrap.TestNodeController; -import org.apache.asterix.app.data.gen.TupleGenerator; -import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.app.data.gen.RecordTupleGenerator; +import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction; import org.apache.asterix.app.nc.RecoveryManager; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.Checkpoint; @@ -43,14 +46,12 @@ import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.file.StorageComponentProvider; -import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.test.common.TestHelper; +import org.apache.asterix.test.dataflow.StorageTestUtils; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager; import org.apache.asterix.transaction.management.service.transaction.TransactionManager; @@ -60,17 +61,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.stubbing.Answer; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; - public class CheckpointingTest { private static final String TEST_CONFIG_FILE_NAME = "cc-small-txn-log-partition.conf"; @@ -116,23 +112,21 @@ public class CheckpointingTest { nc.init(); List<List<String>> partitioningKeys = new ArrayList<>(); partitioningKeys.add(Collections.singletonList("key")); - Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, - NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, - partitioningKeys, null, null, null, false, null), - null, DatasetType.INTERNAL, DATASET_ID, 0); try { - nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES, - KEY_INDICATOR_LIST, 0); + nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, + KEY_INDEXES, KEY_INDICATOR_LIST, 0); JobId jobId = nc.newJobId(); IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false); ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); // Prepare insert operation - LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, - RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); + LSMInsertDeleteOperatorNodePushable insertOp = + nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + RecordTupleGenerator tupleGenerator = + new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, + RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); @@ -197,8 +191,9 @@ public class CheckpointingTest { nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); // Prepare insert operation - LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES, - RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); + LSMInsertDeleteOperatorNodePushable insertOp2 = + nc.getInsertPipeline(ctx2, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, + KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); insertOp2.open(); VSizeFrame frame2 = new VSizeFrame(ctx2); FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2); @@ -220,6 +215,7 @@ public class CheckpointingTest { } } Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread th, Throwable ex) { threadException = true; exception = ex; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java index c7ae2df..62c882d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.util.ExternalDataConstants; @@ -107,6 +106,10 @@ public class ExecutionTestUtil { tearDown(cleanup, integrationUtil, true); } + public static void tearDown(boolean cleanup, boolean stopHdfs) throws Exception { + tearDown(cleanup, integrationUtil, stopHdfs); + } + public static void tearDown(boolean cleanup, AsterixHyracksIntegrationUtil integrationUtil, boolean stopHdfs) throws Exception { // validateBufferCacheState(); <-- Commented out until bug is fixed --> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 9eb6259..b6581ec 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.IReplicaManager; +import org.apache.asterix.common.transactions.IRecoveryManagerFactory; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.INCServiceContext; @@ -65,7 +66,8 @@ public interface INcApplicationContext extends IApplicationContext { IResourceIdFactory getResourceIdFactory(); - void initialize(boolean initialRun) throws IOException, AlgebricksException; + void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun) + throws IOException, AlgebricksException; void setShuttingdown(boolean b); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index f4d764a..6e2e320 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -55,7 +55,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> { this.setRegistered(false); this.setMemoryAllocated(false); this.logManager = logManager; - waitLog.setLogType(LogType.WAIT); + waitLog.setLogType(LogType.WAIT_FOR_FLUSHES); waitLog.computeAndSetLogSize(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index e5d18cf..50a4bef 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -23,9 +23,7 @@ import static org.apache.asterix.common.metadata.MetadataIndexImmutablePropertie import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,7 +32,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.IDatasetMemoryManager; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.dataflow.DatasetLocalResource; -import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.replication.IReplicationStrategy; @@ -42,19 +40,16 @@ import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; -import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; -import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.ILocalResourceRepository; @@ -70,7 +65,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC private final ILocalResourceRepository resourceRepository; private final IDatasetMemoryManager memoryManager; private final ILogManager logManager; - private final LogRecord logRecord; + private final LogRecord waitLog; private final int numPartitions; private volatile boolean stopped = false; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; @@ -84,7 +79,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC this.memoryManager = memoryManager; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; this.numPartitions = numPartitions; - logRecord = new LogRecord(); + waitLog = new LogRecord(); + waitLog.setLogType(LogType.WAIT_FOR_FLUSHES); + waitLog.computeAndSetLogSize(); } @Override @@ -371,7 +368,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC @Override public synchronized void flushAllDatasets() throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { - flushDatasetOpenIndexes(dsr, false); + if (dsr.getDatasetInfo().isOpen()) { + flushDatasetOpenIndexes(dsr, false); + } } } @@ -423,77 +422,48 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC */ private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException { DatasetInfo dsInfo = dsr.getDatasetInfo(); + if (!dsInfo.isOpen()) { + throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed"); + } if (dsInfo.isExternal()) { // no memory components for external dataset return; } + // ensure all in-flight flushes gets scheduled + logManager.log(waitLog); for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) { // flush each partition one by one if (primaryOpTracker.getNumActiveOperations() > 0) { throw new IllegalStateException( "flushDatasetOpenIndexes is called on a dataset with currently active operations"); } - int partition = primaryOpTracker.getPartition(); - Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition); - ILSMIndex flushIndex = null; - for (ILSMIndex lsmIndex : indexes) { - if (!lsmIndex.isCurrentMutableComponentEmpty()) { - flushIndex = lsmIndex; - break; - } - } - if (flushIndex == null) { - // all open indexes are empty, nothing to flush - continue; - } - LSMComponentId componentId = (LSMComponentId) flushIndex.getCurrentMemoryComponent().getId(); - ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition); - idGenerator.refresh(); - if (dsInfo.isDurable()) { - synchronized (logRecord) { - TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), partition, - componentId.getMinId(), componentId.getMaxId(), null); - try { - logManager.log(logRecord); - } catch (ACIDException e) { - throw new HyracksDataException("could not write flush log while closing dataset", e); - } - - try { - //notification will come from LogBuffer class (notifyFlushTerminator) - logRecord.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); - } - } - } - long flushLsn = logRecord.getLSN(); - ILSMComponentId nextComponentId = idGenerator.getId(); - Map<String, Object> flushMap = new HashMap<>(); - flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); - flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId); - for (ILSMIndex index : indexes) { - ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); - accessor.getOpContext().setParameters(flushMap); - accessor.scheduleFlush(); - } - if (!asyncFlush) { - // Wait for the above flush op. - dsInfo.waitForIO(); + primaryOpTracker.setFlushOnExit(true); + primaryOpTracker.flushIfNeeded(); + } + // ensure requested flushes were scheduled + logManager.log(waitLog); + if (!asyncFlush) { + List<FlushOperation> flushes = new ArrayList<>(); + for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) { + flushes.addAll(primaryOpTracker.getScheduledFlushes()); } + LSMIndexUtil.waitFor(flushes); } } private void closeDataset(DatasetResource dsr) throws HyracksDataException { // First wait for any ongoing IO operations DatasetInfo dsInfo = dsr.getDatasetInfo(); - dsInfo.waitForIO(); try { flushDatasetOpenIndexes(dsr, false); } catch (Exception e) { throw HyracksDataException.create(e); } + // wait for merges that were scheduled due to the above flush + // ideally, we shouldn't need this since merges should still work. + // They don't need a special memory budget but there is a problem + // for some merge policies that need to access dataset info (correlated prefix) + dsInfo.waitForIO(); for (IndexInfo iInfo : dsInfo.getIndexes().values()) { closeIndex(iInfo); } @@ -505,7 +475,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public synchronized void closeAllDatasets() throws HyracksDataException { ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); for (DatasetResource dsr : openDatasets) { - closeDataset(dsr); + if (dsr.isOpen()) { + closeDataset(dsr); + } } } @@ -612,7 +584,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC @Override public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { - if (replicationStrategy.isMatch(dsr.getDatasetID())) { + if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) { flushDatasetOpenIndexes(dsr, false); } }