Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-1969][STO] Ignore corrupted checkpoints ......................................................................
[ASTERIXDB-1969][STO] Ignore corrupted checkpoints - user model changes: no - storage format changes: no - interface changes: no Details: - Ignore and delete corrupted checkpoint files. - In case all checkpoint files are corrupted, force full recovery. - Add test to check the new behavior of CheckpointManager. - Remove unused recovery manager method. Change-Id: Ied8a188501b63a0d339e6391cac684e3378f4c37 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1871 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java 4 files changed, 95 insertions(+), 48 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 275b055..10af9ff 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -158,27 +158,6 @@ return state; } - //This method is used only when replication is disabled. - @Override - public void startRecovery(boolean synchronous) throws IOException, ACIDException { - state = SystemState.RECOVERING; - LOGGER.log(Level.INFO, "starting recovery ..."); - - long readableSmallestLSN = logMgr.getReadableSmallestLSN(); - Checkpoint checkpointObject = checkpointManager.getLatest(); - long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn(); - if (lowWaterMarkLSN < readableSmallestLSN) { - lowWaterMarkLSN = readableSmallestLSN; - } - - //delete any recovery files from previous failed recovery attempts - deleteRecoveryTemporaryFiles(); - - //get active partitions on this node - Set<Integer> activePartitions = localResourceRepository.getNodeOrignalPartitions(); - replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), lowWaterMarkLSN); - } - @Override public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException { state = SystemState.RECOVERING; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index d2bf3d3..370205b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -19,6 +19,7 @@ package org.apache.asterix.test.logging; import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -32,10 +33,12 @@ import org.apache.asterix.common.configuration.AsterixConfiguration; import org.apache.asterix.common.configuration.Property; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -47,6 +50,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.test.common.TestHelper; import org.apache.asterix.transaction.management.service.logging.LogManager; +import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -208,4 +212,50 @@ Assert.fail(e.getMessage()); } } + + @Test + public void testCorruptedCheckpointFiles() { + try { + TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false); + nc.init(); + final ITransactionSubsystem txnSubsystem = nc.getTransactionSubsystem(); + final AbstractCheckpointManager checkpointManager = + (AbstractCheckpointManager) txnSubsystem.getCheckpointManager(); + // Make a checkpoint with the current minFirstLSN + final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); + checkpointManager.tryCheckpoint(minFirstLSN); + // Get the just created checkpoint + final Checkpoint validCheckpoint = checkpointManager.getLatest(); + // Make sure the valid checkout wouldn't force full recovery + Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN); + // Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint + Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1); + File corruptedCheckpoint = corruptedCheckpointPath.toFile(); + corruptedCheckpoint.createNewFile(); + // Make sure the corrupted checkpoint file was created + Assert.assertTrue(corruptedCheckpoint.exists()); + // Try to get the latest checkpoint again + Checkpoint cpAfterCorruption = checkpointManager.getLatest(); + // Make sure the valid checkpoint was returned + Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp()); + // Make sure the corrupted checkpoint file was deleted + Assert.assertFalse(corruptedCheckpoint.exists()); + // Corrupt the valid checkpoint by replacing its content + final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp()); + File validCheckpointFile = validCheckpointPath.toFile(); + Assert.assertTrue(validCheckpointFile.exists()); + // Delete the valid checkpoint file and create it as an empty file + validCheckpointFile.delete(); + validCheckpointFile.createNewFile(); + // Make sure the returned checkpoint (the forged checkpoint) will enforce full recovery + Checkpoint forgedCheckpoint = checkpointManager.getLatest(); + Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN); + // Make sure the forged checkpoint recovery will start from the first available log + final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN(); + Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index 84e1019..7965aa5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -33,7 +33,7 @@ */ public interface IRecoveryManager { - public enum SystemState { + enum SystemState { BOOTSTRAPPING, // The first time the NC is bootstrapped. PERMANENT_DATA_LOSS, // No checkpoint files found on NC and it is not BOOTSTRAPPING (data loss). RECOVERING, // Recovery process is on-going. @@ -41,7 +41,10 @@ CORRUPTED // Some txn logs need to be replayed (need to perform recover). } - public class ResourceType { + class ResourceType { + private ResourceType() { + } + public static final byte LSM_BTREE = 0; public static final byte LSM_RTREE = 1; public static final byte LSM_INVERTED_INDEX = 2; @@ -61,38 +64,25 @@ SystemState getSystemState() throws ACIDException; /** - * Initiates a crash recovery. - * - * @param synchronous - * indicates if the recovery is to be done in a synchronous - * manner. In asynchronous mode, the recovery will happen as part - * of a separate thread. - * @return SystemState the state of the system (@see SystemState) post - * recovery. - * @throws ACIDException - */ - public void startRecovery(boolean synchronous) throws IOException, ACIDException; - - /** * Rolls back a transaction. * * @param txnContext * the transaction context associated with the transaction * @throws ACIDException */ - public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException; + void rollbackTransaction(ITransactionContext txnContext) throws ACIDException; /** * @return min first LSN of the open indexes (including remote indexes if replication is enabled) * @throws HyracksDataException */ - public long getMinFirstLSN() throws HyracksDataException; + long getMinFirstLSN() throws HyracksDataException; /** * @return min first LSN of the open indexes * @throws HyracksDataException */ - public long getLocalMinFirstLSN() throws HyracksDataException; + long getLocalMinFirstLSN() throws HyracksDataException; /** * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN} @@ -102,7 +92,7 @@ * @throws IOException * @throws ACIDException */ - public void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) + void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) throws IOException, ACIDException; /** @@ -114,12 +104,12 @@ * @throws IOException * if the file for the specified {@code jobId} with the {@code fileName} already exists */ - public File createJobRecoveryFile(int jobId, String fileName) throws IOException; + File createJobRecoveryFile(int jobId, String fileName) throws IOException; /** * Deletes all temporary recovery files */ - public void deleteRecoveryTemporaryFiles(); + void deleteRecoveryTemporaryFiles(); /** * Performs the local recovery process on {@code partitions} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java index 8d3e0a7..24d316b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java @@ -88,13 +88,26 @@ List<Checkpoint> checkpointObjectList = new ArrayList<>(); for (File file : checkpoints) { try { - LOGGER.log(Level.WARNING, "Reading snapshot file: " + file.getAbsolutePath()); + LOGGER.log(Level.WARNING, "Reading checkpoint file: " + file.getAbsolutePath()); String jsonString = new String(Files.readAllBytes(Paths.get(file.getAbsolutePath()))); checkpointObjectList.add(Checkpoint.fromJson(jsonString)); } catch (IOException e) { - throw new ACIDException("Failed to read a checkpoint file", e); + // ignore corrupted checkpoint file + LOGGER.log(Level.WARNING, "Failed to read checkpoint file: " + file.getAbsolutePath(), e); + file.delete(); + LOGGER.log(Level.INFO, "Deleted corrupted checkpoint file: " + file.getAbsolutePath()); } } + /** + * If all checkpoint files are corrupted, we have no option but to try to perform recovery. + * We will forge a checkpoint that forces recovery to start from the beginning of the log. + * This shouldn't happen unless a hardware corruption happens. + */ + if (checkpointObjectList.isEmpty()) { + LOGGER.severe("All checkpoint files are corrupted. Forcing recovery from the beginning of the log"); + checkpointObjectList.add(forgeForceRecoveryCheckpoint()); + } + // Sort checkpointObjects in descending order by timeStamp to find out the most recent one. Collections.sort(checkpointObjectList); @@ -125,6 +138,11 @@ // Nothing to dump } + public Path getCheckpointPath(long checkpointTimestamp) { + return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + Long + .toString(checkpointTimestamp)); + } + protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException { ILogManager logMgr = txnSubsystem.getLogManager(); ITransactionManager txnMgr = txnSubsystem.getTransactionManager(); @@ -134,14 +152,24 @@ cleanup(); } + protected Checkpoint forgeForceRecoveryCheckpoint() { + /** + * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from + * the first available log. + * We set the storage version to the current version. If there is a version mismatch, it will be detected + * during recovery. + */ + return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false, + StorageConstants.VERSION); + } + private void persist(Checkpoint checkpoint) throws HyracksDataException { - // Construct checkpoint file name - String fileName = checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX - + Long.toString(checkpoint.getTimeStamp()); + // Get checkpoint file path + Path path = getCheckpointPath(checkpoint.getTimeStamp()); // Write checkpoint file to disk - Path path = Paths.get(fileName); try (BufferedWriter writer = Files.newBufferedWriter(path)) { writer.write(checkpoint.asJson()); + writer.flush(); } catch (IOException e) { throw new HyracksDataException("Failed to write checkpoint to disk", e); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1871 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ied8a188501b63a0d339e6391cac684e3378f4c37 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
