>From Peeyush Gupta <[email protected]>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724 )
Change subject: [NO ISSUE][TX][TEST] Add test for atomic statments ...................................................................... [NO ISSUE][TX][TEST] Add test for atomic statments - user model changes: no - storage format changes: no - interface changes: no Details: - Add test that stops NCs at random times while upserts are taking place on standalone collections. - Fix the issue with rollback and recovery for atomic statements - Add timeout while waiting for AtomicJobComplete and AtomicJobRollback messages - Remove datasets without type spec from checpoint thread - Some refactoring Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724 Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Murtadha Al Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Peeyush Gupta <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java D asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java 17 files changed, 399 insertions(+), 101 deletions(-) Approvals: Murtadha Al Hubail: Looks good to me, approved Peeyush Gupta: Looks good to me, but someone else must approve; Verified Jenkins: Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java index f6456dc..b4223bc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.app.cc; +import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT; + import java.util.List; import java.util.Map; import java.util.Set; @@ -38,8 +40,11 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.nc.io.IOManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -85,7 +90,8 @@ synchronized (context) { try { - context.wait(); + CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig(); + context.wait(config.getGlobalTxCommitTimeout()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ACIDException(e); @@ -194,10 +200,12 @@ } synchronized (context) { try { - context.wait(); + CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig(); + context.wait(config.getGlobalTxRollbackTimeout()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new ACIDException(e); + LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId); + ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT); } } txnContextRepository.remove(jobId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java index 71a641e..e611d8f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.app.message; +import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT; + import java.util.List; import java.util.Map; @@ -32,6 +34,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.util.ExitUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Message sent from CC to all NCs to rollback an atomic statement/job. @@ -43,6 +48,8 @@ private final List<Integer> datasetIds; private final Map<String, ILSMComponentId> componentIdMap; + private static final Logger LOGGER = LogManager.getLogger(); + public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds, Map<String, ILSMComponentId> componentIdMap) { this.jobId = jobId; @@ -57,15 +64,19 @@ datasetLifecycleManager.getIndexCheckpointManagerProvider(); componentIdMap.forEach((k, v) -> { try { - IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k)); + IIndexCheckpointManager checkpointManager = + indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k)); if (checkpointManager.getCheckpointCount() > 0) { IndexCheckpoint checkpoint = checkpointManager.getLatest(); if (checkpoint.getLastComponentId() == v.getMaxId()) { + LOGGER.info("Removing checkpoint for resource {} for component id {}", k, + checkpoint.getLastComponentId()); checkpointManager.deleteLatest(v.getMaxId(), 1); } } - } catch (HyracksDataException e) { - throw new RuntimeException(e); + } catch (Exception e) { + LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId); + ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT); } }); AtomicJobRollbackCompleteMessage message = diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java index 0936512..beaaac0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java @@ -70,6 +70,9 @@ deleteInvalidIndex(appContext, localResourceRepository, resource); } } + for (Integer partition : nodePartitions) { + localResourceRepository.cleanup(partition); + } try { broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null)); } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 5494250..f6eb123 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -61,7 +61,6 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.TxnId; -import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; @@ -158,9 +157,6 @@ @Override public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException { state = SystemState.RECOVERING; - if (appCtx.isCloudDeployment()) { - doMetadataRecovery(); - } LOGGER.info("starting recovery for partitions {}", partitions); long readableSmallestLSN = logMgr.getReadableSmallestLSN(); Checkpoint checkpointObject = checkpointManager.getLatest(); @@ -175,11 +171,6 @@ replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true); } - public synchronized void doMetadataRecovery() { - LOGGER.info("starting recovery for metadata partition {}", StorageConstants.METADATA_PARTITION); - appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL(); - } - public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN, boolean closeOnFlushRedo) throws IOException, ACIDException { try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java index 0071ee0..435419b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java @@ -46,6 +46,10 @@ deleteInvalidMetadataIndexes(localResourceRepository); final Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions(); localResourceRepository.deleteCorruptedResources(); + INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext(); + if (appCtx.isCloudDeployment() && nodePartitions.contains(metadataPartitionId)) { + appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL(); + } for (Integer partition : nodePartitions) { localResourceRepository.cleanup(partition); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index 3f76a31..df2c25d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -112,12 +112,12 @@ LOGGER.info("Starting Global Recovery"); MetadataManager.INSTANCE.init(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + rollbackIncompleteAtomicTransactions(appCtx); if (appCtx.getStorageProperties().isStorageGlobalCleanup()) { int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout(); performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout); } mdTxnCtx = doRecovery(appCtx, mdTxnCtx); - rollbackIncompleteAtomicTransactions(appCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); recoveryCompleted = true; recovering = false; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java index 06380fe..60bcbb5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java @@ -91,6 +91,22 @@ } /** + * Creates a dataset without type specification + * @param dataset The name of the dataset + * @param fields The fields composing the primary key + * @param pkAutogenerated Is the primary key autogenerated + * @throws Exception + */ + public static void createDatasetWithoutType(String dataset, Map<String, String> fields, boolean pkAutogenerated) + throws Exception { + StringBuilder stringBuilder = new StringBuilder(""); + fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(",")); + stringBuilder.deleteCharAt(stringBuilder.length() - 1); + TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + " PRIMARY KEY (" + stringBuilder + ")" + + (pkAutogenerated ? "AUTOGENERATED;" : ";"), OUTPUT_FORMAT); + } + + /** * Creates a secondary primary index * @param dataset the name of the dataset * @param indexName the name of the index @@ -128,6 +144,22 @@ } /** + * Creates a single insert statement with multiple records containing name field + * @param dataset The name of the dataset + * @param count Number of records in the insert statement + * @throws Exception + */ + public static void insertBulkData(String dataset, long count) throws Exception { + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < count; i++) { + stringBuilder.append("{\"name\": \"name_" + i + "\"},"); + } + stringBuilder.deleteCharAt(stringBuilder.length() - 1); + TEST_EXECUTOR.executeSqlppUpdateOrDdl("INSERT INTO " + dataset + "([" + stringBuilder + "]);", + TestCaseContext.OutputFormat.CLEAN_JSON); + } + + /** * Gets the number of records in dataset {@code dataset} * * @param datasetName diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java new file mode 100644 index 0000000..0d915de --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.atomic_statements; + +import static org.apache.hyracks.util.file.FileUtil.joinPath; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Random; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.api.common.LocalCloudUtil; +import org.apache.asterix.common.TestDataUtil; +import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class AtomicMetadataTransactionWithoutWALTest { + public static final String RESOURCES_PATH = joinPath(System.getProperty("user.dir"), "src", "test", "resources"); + public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf"); + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + private static final TestExecutor TEST_EXECUTOR = new TestExecutor(); + private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String DATASET_NAME_PREFIX = "ds_"; + private static final int NUM_DATASETS = 500; + private static final int NUM_RECOVERIES = 10; + + @Before + public void setUp() throws Exception { + boolean cleanStart = Boolean.getBoolean("cleanup.start"); + LocalCloudUtil.startS3CloudEnvironment(cleanStart); + integrationUtil.setGracefulShutdown(false); + integrationUtil.init(true, CONFIG_FILE); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + private void createDatasets() throws Exception { + String datasetName; + for (int i = 0; i < NUM_DATASETS; i++) { + datasetName = DATASET_NAME_PREFIX + i; + TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true); + } + } + + @Test + public void testAtomicityWithFailures() throws Exception { + final String leftJoinQuery = "SELECT VALUE COUNT(*) FROM Metadata.`Dataset` ds LEFT JOIN Metadata.`Index` i " + + "ON ds.DatasetName=i.DatasetName AND i.IsPrimary=true WHERE ds.DatasetId!=0 AND i.DatasetName IS MISSING;"; + final String rightJoinQuery = "SELECT VALUE COUNT(*) FROM Metadata.`Dataset` ds RIGHT JOIN Metadata.`Index` i " + + "ON ds.DatasetName=i.DatasetName AND i.IsPrimary=true WHERE ds.DatasetId!=0 AND ds.DatasetName IS MISSING;"; + for (int i = 0; i <= NUM_RECOVERIES; i++) { + Thread thread = new Thread(() -> { + try { + createDatasets(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + Random rnd = new Random(); + Thread.sleep(rnd.nextInt(100) + 10); + integrationUtil.deinit(true); + integrationUtil.init(true, CONFIG_FILE); + + Assert.assertEquals(0, runCountQuery(leftJoinQuery)); + Assert.assertEquals(0, runCountQuery(rightJoinQuery)); + } + } + + private int runCountQuery(String query) throws Exception { + InputStream responseStream = TEST_EXECUTOR.executeQueryService(query, + TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT, StandardCharsets.UTF_8); + ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class); + JsonNode result = response.get("results"); + Assert.assertEquals(1, result.size()); + return result.get(0).asInt(); + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java new file mode 100644 index 0000000..5219e011 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.atomic_statements; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.common.TestDataUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AtomicStatementsTest { + private static final String TEST_CONFIG_FILE_NAME = "cc.conf"; + private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "src" + + File.separator + "main" + File.separator + "resources"; + private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME; + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + + private static final String DATASET_NAME_PREFIX = "ds_"; + private static final int NUM_DATASETS = 5; + private static final int BATCH_SIZE = 100; + private static final int NUM_UPSERTS = 100; + private static final int NUM_RECOVERIES = 10; + + @Before + public void setUp() throws Exception { + integrationUtil.setGracefulShutdown(false); + integrationUtil.init(true, TEST_CONFIG_FILE_PATH); + createDatasets(); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + private void createDatasets() throws Exception { + String datasetName; + for (int i = 0; i < NUM_DATASETS; i++) { + datasetName = DATASET_NAME_PREFIX + i; + TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true); + TestDataUtil.createSecondaryBTreeIndex(datasetName, datasetName + "_sidx", "name:string"); + } + } + + private Thread insertRecords(String dataset) { + Thread thread = new Thread(() -> { + try { + for (int i = 0; i < NUM_UPSERTS; i++) { + TestDataUtil.insertBulkData(dataset, BATCH_SIZE); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + return thread; + } + + @Test + public void testAtomicityWithFailures() throws Exception { + for (int i = 0; i <= NUM_RECOVERIES; i++) { + List<Thread> threads = new ArrayList<>(); + for (int j = 0; j < NUM_DATASETS; j++) { + threads.add(insertRecords(DATASET_NAME_PREFIX + j)); + } + Random rnd = new Random(); + Thread.sleep(rnd.nextInt(2000) + 500); + integrationUtil.deinit(false); + integrationUtil.init(false, TEST_CONFIG_FILE_PATH); + + for (int j = 0; j < NUM_DATASETS; j++) { + final long countAfterRecovery = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j); + Assert.assertEquals(0, countAfterRecovery % BATCH_SIZE); + } + } + } + + @Test + public void testAtomicityWithoutFailures() throws Exception { + List<Thread> threads = new ArrayList<>(); + for (int j = 0; j < NUM_DATASETS; j++) { + threads.add(insertRecords(DATASET_NAME_PREFIX + j)); + threads.add(insertRecords(DATASET_NAME_PREFIX + j)); + } + for (Thread thread : threads) { + thread.join(); + } + for (int j = 0; j < NUM_DATASETS; j++) { + long count = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j); + Assert.assertEquals(2 * NUM_UPSERTS * BATCH_SIZE, count); + } + + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index 7e38b8b..34bc587 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -279,7 +279,7 @@ for (FlushOperation flush : lastFlushOperation.values()) { FileReference target = flush.getTarget(); Map<String, Object> map = flush.getParameters(); - final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID); + final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID); final ResourceReference ref = ResourceReference.of(target.getAbsolutePath()); final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd(); indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId()); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index be935d0..014bc5c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -37,6 +37,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.external.IDataSourceAdapter; +import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory; import org.apache.asterix.common.utils.StorageConstants; @@ -403,8 +404,9 @@ : new SecondaryIndexOperationTrackerFactory(datasetId); ILSMComponentIdGeneratorFactory idGeneratorProvider = new DatasetLSMComponentIdGeneratorFactory(datasetId); DatasetInfoProvider datasetInfoProvider = new DatasetInfoProvider(datasetId); - ILSMIOOperationCallbackFactory ioOpCallbackFactory = - new LSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = appContext.isCloudDeployment() + ? new AtomicLSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider) + : new LSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider); ILSMPageWriteCallbackFactory pageWriteCallbackFactory = new LSMIndexPageWriteCallbackFactory(); IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider(); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index f09248f..460f393 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -139,7 +139,8 @@ return lsmIndex -> { if (lsmIndex.isPrimaryIndex()) { PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker(); - return currentTime - opTracker.getLastFlushTime() >= datasetCheckpointIntervalNanos; + return !lsmIndex.isAtomic() + && currentTime - opTracker.getLastFlushTime() >= datasetCheckpointIntervalNanos; } return false; }; @@ -148,7 +149,7 @@ private Predicate<ILSMIndex> newLaggingDatasetPredicate(long checkpointTargetLSN) { return lsmIndex -> { final LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback(); - return ioCallback.getPersistenceLsn() < checkpointTargetLSN; + return !lsmIndex.isAtomic() && ioCallback.getPersistenceLsn() < checkpointTargetLSN; }; } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java index aa698be..151d9ef 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.transaction.management.service.transaction; +import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT; + import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; @@ -44,14 +46,19 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; +import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.annotations.ThreadSafe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; @ThreadSafe public class AtomicNoWALTransactionContext extends AtomicTransactionContext { + private static final Logger LOGGER = LogManager.getLogger(); private final INcApplicationContext appCtx; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -111,9 +118,13 @@ } try { commit(); - } catch (Exception e) { - rollback(resourceMap); - throw new ACIDException(e); + } catch (HyracksDataException e) { + try { + rollback(resourceMap); + } catch (Exception ex) { + LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", txnId); + ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT); + } } finally { deleteLogFile(); } @@ -122,17 +133,25 @@ private void persistLogFile(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap) throws HyracksDataException, JsonProcessingException { - IIOManager ioManager = appCtx.getIoManager(); + IIOManager ioManager = appCtx.getPersistenceIoManager(); FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION, String.format("%s.log", txnId)).toString()); - MetadataAtomicTransactionLog txnLog = new MetadataAtomicTransactionLog(txnId, datasetIds, - appCtx.getServiceContext().getNodeId(), resourceMap); - ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes()); + ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter() + .writeValueAsString(toJson(datasetIds, resourceMap)).getBytes()); + } + + private ObjectNode toJson(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap) { + ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode(); + jsonNode.put("txnId", txnId.getId()); + jsonNode.putPOJO("datasetIds", datasetIds); + jsonNode.put("nodeId", appCtx.getServiceContext().getNodeId()); + jsonNode.putPOJO("resourceMap", resourceMap); + return jsonNode; } public void deleteLogFile() { - IIOManager ioManager = appCtx.getIoManager(); + IIOManager ioManager = appCtx.getPersistenceIoManager(); try { FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION, @@ -172,10 +191,13 @@ datasetLifecycleManager.getIndexCheckpointManagerProvider(); resourceMap.forEach((k, v) -> { try { - IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k)); + IIndexCheckpointManager checkpointManager = + indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k)); if (checkpointManager.getCheckpointCount() > 0) { IndexCheckpoint checkpoint = checkpointManager.getLatest(); if (checkpoint.getLastComponentId() == v.getMaxId()) { + LOGGER.info("Removing checkpoint for resource {} for component id {}", k, + checkpoint.getLastComponentId()); checkpointManager.deleteLatest(v.getMaxId(), 1); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java deleted file mode 100644 index 7b3af3f..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.transactions.TxnId; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -@JsonIgnoreProperties(ignoreUnknown = true) -public class MetadataAtomicTransactionLog { - - private TxnId txnId; - private List<Integer> datasetIds; - private String nodeId; - private Map<String, ILSMComponentId> resourceMap; - - @JsonCreator - public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId, - @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeId") String nodeId, - @JsonProperty("resourceMap") Map<String, ILSMComponentId> resourceMap) { - this.txnId = txnId; - this.datasetIds = datasetIds; - this.nodeId = nodeId; - this.resourceMap = resourceMap; - } - - public TxnId getTxnId() { - return txnId; - } - - public List<Integer> getDatasetIds() { - return datasetIds; - } - - public String getNodeId() { - return nodeId; - } - - public Map<String, ILSMComponentId> getResourceMap() { - return resourceMap; - } -} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index ea7fcb8..4c7f53b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -41,12 +43,16 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; @ThreadSafe public class TransactionManager implements ITransactionManager, ILifeCycleComponent { @@ -196,7 +202,7 @@ @Override public void rollbackMetadataTransactionsWithoutWAL() { - IIOManager ioManager = txnSubsystem.getApplicationContext().getIoManager(); + IIOManager ioManager = txnSubsystem.getApplicationContext().getPersistenceIoManager(); try { Set<FileReference> txnLogFileRefs = ioManager.list(ioManager.resolve(Paths @@ -205,15 +211,30 @@ .toString())); ObjectMapper objectMapper = new ObjectMapper(); for (FileReference txnLogFileRef : txnLogFileRefs) { - MetadataAtomicTransactionLog atomicTransactionLog = objectMapper.readValue( - new String(ioManager.readAllBytes(txnLogFileRef)), MetadataAtomicTransactionLog.class); - AtomicNoWALTransactionContext context = new AtomicNoWALTransactionContext( - atomicTransactionLog.getTxnId(), txnSubsystem.getApplicationContext()); - context.rollback(atomicTransactionLog.getResourceMap()); + ObjectNode atomicTransactionLog = + objectMapper.readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class); + TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt()); + JsonNode jsonNode = atomicTransactionLog.get("resourceMap"); + Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode); + AtomicNoWALTransactionContext context = + new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext()); + context.rollback(resourceMap); context.deleteLogFile(); } } catch (Exception e) { throw new ACIDException(e); } } + + private Map<String, ILSMComponentId> getResourceMapFromJson(JsonNode jsonNode) { + Map<String, ILSMComponentId> resourceMap = new HashMap<>(); + for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext();) { + String resourcePath = it.next(); + JsonNode componentIdNode = jsonNode.get(resourcePath); + ILSMComponentId componentId = + new LSMComponentId(componentIdNode.get("minId").asLong(), componentIdNode.get("maxId").asLong()); + resourceMap.put(resourcePath, componentId); + } + return resourceMap; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index 6904e3c..18825da 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -89,7 +89,9 @@ STRING, appConfig -> FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "global-txn-log"), - ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/global-txn-log"); + ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/global-txn-log"), + GLOBAL_TXN_COMMIT_TIMEOUT(LONG, 600000L), + GLOBAL_TXN_ROLLBACK_TIMEOUT(LONG, 600000L); private final IOptionType parser; private Object defaultValue; @@ -213,6 +215,10 @@ return "Path to HTTP basic credentials"; case GLOBAL_TXN_LOG_DIR: return "Directory to store global transaction logs"; + case GLOBAL_TXN_COMMIT_TIMEOUT: + return "Timeout for Commit"; + case GLOBAL_TXN_ROLLBACK_TIMEOUT: + return "Timeout for Rollback"; default: throw new IllegalStateException("NYI: " + this); } @@ -492,4 +498,11 @@ return getAppConfig().getString(Option.GLOBAL_TXN_LOG_DIR); } + public long getGlobalTxCommitTimeout() { + return getAppConfig().getLong(Option.GLOBAL_TXN_COMMIT_TIMEOUT); + } + + public long getGlobalTxRollbackTimeout() { + return getAppConfig().getLong(Option.GLOBAL_TXN_ROLLBACK_TIMEOUT); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index 8f8e8f6..0f03562 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -58,6 +58,7 @@ public static final int EC_ACTIVE_RECOVERY_FAILURE = 20; public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22; public static final int EC_INCONSISTENT_STORAGE_REFERENCES = 23; + public static final int EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT = 24; public static final int EC_IMMEDIATE_HALT = 33; public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44; public static final int EC_IO_SCHEDULER_FAILED = 55; -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b Gerrit-Change-Number: 17724 Gerrit-PatchSet: 9 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Al Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-MessageType: merged
