>From Peeyush Gupta <[email protected]>:
Peeyush Gupta has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19372 )
Change subject: WIP: ignore and log failure to read atomic txn log files
......................................................................
WIP: ignore and log failure to read atomic txn log files
Change-Id: I5a2e3849cb6fe4a78e0499fb2591f7e734908d04
---
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
2 files changed, 56 insertions(+), 13 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/72/19372/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index a5c9867..d509ba6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -186,9 +186,25 @@
public void rollback() throws Exception {
Set<FileReference> txnLogFileRefs =
ioManager.list(ioManager.resolve(StorageConstants.GLOBAL_TXN_DIR_NAME));
for (FileReference txnLogFileRef : txnLogFileRefs) {
- IGlobalTransactionContext context = new
GlobalTransactionContext(txnLogFileRef, ioManager);
- txnContextRepository.put(context.getJobId(), context);
- sendJobRollbackMessages(context);
+ try {
+ IGlobalTransactionContext context = new
GlobalTransactionContext(txnLogFileRef, ioManager);
+ txnContextRepository.put(context.getJobId(), context);
+ sendJobRollbackMessages(context);
+ } catch (Exception e) {
+ LOGGER.error("Error rolling back transaction for {}",
txnLogFileRef, e);
+ cleanup(txnLogFileRef);
+ }
+ }
+ }
+
+ private void cleanup(FileReference resourceFile) {
+ if (resourceFile.getFile().exists()) {
+ try {
+ ioManager.delete(resourceFile);
+ } catch (Throwable th) {
+ LOGGER.error("Error cleaning up corrupted resource {}",
resourceFile, th);
+
ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
+ }
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 4c7f53b..3663e83 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -61,6 +62,7 @@
private final ITransactionSubsystem txnSubsystem;
private final Map<TxnId, ITransactionContext> txnCtxRepository = new
ConcurrentHashMap<>();
private final AtomicLong maxTxnId = new AtomicLong(0);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public TransactionManager(ITransactionSubsystem provider) {
this.txnSubsystem = provider;
@@ -209,23 +211,39 @@
.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
StorageConstants.PARTITION_DIR_PREFIX +
StorageConstants.METADATA_PARTITION)
.toString()));
- ObjectMapper objectMapper = new ObjectMapper();
for (FileReference txnLogFileRef : txnLogFileRefs) {
- ObjectNode atomicTransactionLog =
- objectMapper.readValue(new
String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
- TxnId txnId = new
TxnId(atomicTransactionLog.get("txnId").asInt());
- JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
- Map<String, ILSMComponentId> resourceMap =
getResourceMapFromJson(jsonNode);
- AtomicNoWALTransactionContext context =
- new AtomicNoWALTransactionContext(txnId,
txnSubsystem.getApplicationContext());
- context.rollback(resourceMap);
- context.deleteLogFile();
+ try {
+ ObjectNode atomicTransactionLog = OBJECT_MAPPER
+ .readValue(new
String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
+ TxnId txnId = new
TxnId(atomicTransactionLog.get("txnId").asInt());
+ JsonNode jsonNode =
atomicTransactionLog.get("resourceMap");
+ Map<String, ILSMComponentId> resourceMap =
getResourceMapFromJson(jsonNode);
+ AtomicNoWALTransactionContext context =
+ new AtomicNoWALTransactionContext(txnId,
txnSubsystem.getApplicationContext());
+ context.rollback(resourceMap);
+ context.deleteLogFile();
+ } catch (Exception e) {
+ LOGGER.error("Error rolling back atomic statement for {}",
txnLogFileRef, e);
+ cleanup(txnLogFileRef);
+ }
}
} catch (Exception e) {
throw new ACIDException(e);
}
}
+ private void cleanup(FileReference resourceFile) {
+ IIOManager ioManager =
txnSubsystem.getApplicationContext().getPersistenceIoManager();
+ if (resourceFile.getFile().exists()) {
+ try {
+ ioManager.delete(resourceFile);
+ } catch (Throwable th) {
+ LOGGER.error("Error cleaning up corrupted resource {}",
resourceFile, th);
+
ExitUtil.halt(ExitUtil.EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES);
+ }
+ }
+ }
+
private Map<String, ILSMComponentId> getResourceMapFromJson(JsonNode
jsonNode) {
Map<String, ILSMComponentId> resourceMap = new HashMap<>();
for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext();) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19372
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I5a2e3849cb6fe4a78e0499fb2591f7e734908d04
Gerrit-Change-Number: 19372
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange