Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][TX] Fix Concurrent Access in TransactionContext ......................................................................
[NO ISSUE][TX] Fix Concurrent Access in TransactionContext - user model changes: no - storage format changes: no - interface changes: no Details: - Ensure all access to TransactionContext is thread safe. Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c Reviewed-on: https://asterix-gerrit.ics.uci.edu/2875 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java 3 files changed, 16 insertions(+), 18 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, but someone else must approve diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java index 95cabf9..a0944ea 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java @@ -36,7 +36,7 @@ public abstract class AbstractTransactionContext implements ITransactionContext { protected final TxnId txnId; - protected final Map<Long, ITransactionOperationTracker> txnOpTrackers; + private final Map<Long, ITransactionOperationTracker> txnOpTrackers; private final AtomicLong firstLSN; private final AtomicLong lastLSN; private final AtomicInteger txnState; diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java index 079e99a..083c26b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.transaction.management.service.transaction; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.exceptions.ACIDException; @@ -35,9 +35,9 @@ @ThreadSafe public class AtomicTransactionContext extends AbstractTransactionContext { - private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>(); - private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>(); - private final Map<Long, IModificationOperationCallback> callbacks = new HashMap<>(); + private final Map<Long, ILSMOperationTracker> opTrackers = new ConcurrentHashMap<>(); + private final Map<Long, AtomicInteger> indexPendingOps = new ConcurrentHashMap<>(); + private final Map<Long, IModificationOperationCallback> callbacks = new ConcurrentHashMap<>(); public AtomicTransactionContext(TxnId txnId) { super(txnId); @@ -47,7 +47,7 @@ public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex) { super.register(resourceId, partition, index, callback, primaryIndex); - synchronized (txnOpTrackers) { + synchronized (opTrackers) { if (primaryIndex && !opTrackers.containsKey(resourceId)) { opTrackers.put(resourceId, index.getOperationTracker()); callbacks.put(resourceId, callback); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java index 9fcb08b..188bb1b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.transaction.management.service.transaction; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; @@ -42,23 +42,21 @@ public EntityLevelTransactionContext(TxnId txnId) { super(txnId); - this.primaryIndexTrackers = new HashMap<>(); - this.resourcePendingOps = new HashMap<>(); - this.partitionPendingOps = new HashMap<>(); + this.primaryIndexTrackers = new ConcurrentHashMap<>(); + this.resourcePendingOps = new ConcurrentHashMap<>(); + this.partitionPendingOps = new ConcurrentHashMap<>(); } @Override public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex) { super.register(resourceId, partition, index, callback, primaryIndex); - synchronized (txnOpTrackers) { - AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0)); - resourcePendingOps.put(resourceId, pendingOps); - if (primaryIndex) { - Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair = - new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback); - primaryIndexTrackers.put(partition, pair); - } + AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0)); + resourcePendingOps.put(resourceId, pendingOps); + if (primaryIndex) { + Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair = + new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback); + primaryIndexTrackers.put(partition, pair); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2875 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
