Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2875
Change subject: [NO ISSUE][TX] Fix Concurrent Access in TransactionContext
......................................................................
[NO ISSUE][TX] Fix Concurrent Access in TransactionContext
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure all access to TransactionContext is thread safe.
Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c
---
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
3 files changed, 25 insertions(+), 31 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/75/2875/1
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 95cabf9..cc27cd8 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +50,7 @@
txnState = new AtomicInteger(ITransactionManager.ACTIVE);
isTimeout = false;
isWriteTxn = new AtomicBoolean();
- txnOpTrackers = new HashMap<>();
+ txnOpTrackers = new ConcurrentHashMap<>();
}
@Override
@@ -120,13 +120,11 @@
@Override
public void register(long resourceId, int partition, ILSMIndex index,
IModificationOperationCallback callback,
boolean primaryIndex) {
- synchronized (txnOpTrackers) {
- if (!txnOpTrackers.containsKey(resourceId)) {
- final ITransactionOperationTracker txnOpTracker =
- (ITransactionOperationTracker)
index.getOperationTracker();
- txnOpTrackers.put(resourceId, txnOpTracker);
- txnOpTracker.beforeTransaction(resourceId);
- }
+ if (!txnOpTrackers.containsKey(resourceId)) {
+ final ITransactionOperationTracker txnOpTracker =
+ (ITransactionOperationTracker) index.getOperationTracker();
+ txnOpTrackers.put(resourceId, txnOpTracker);
+ txnOpTracker.beforeTransaction(resourceId);
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 079e99a..49cd82b 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -35,9 +35,9 @@
@ThreadSafe
public class AtomicTransactionContext extends AbstractTransactionContext {
- private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>();
- private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>();
- private final Map<Long, IModificationOperationCallback> callbacks = new
HashMap<>();
+ private final Map<Long, ILSMOperationTracker> opTrackers = new
ConcurrentHashMap<>();
+ private final Map<Long, AtomicInteger> indexPendingOps = new
ConcurrentHashMap<>();
+ private final Map<Long, IModificationOperationCallback> callbacks = new
ConcurrentHashMap<>();
public AtomicTransactionContext(TxnId txnId) {
super(txnId);
@@ -47,12 +47,10 @@
public void register(long resourceId, int partition, ILSMIndex index,
IModificationOperationCallback callback,
boolean primaryIndex) {
super.register(resourceId, partition, index, callback, primaryIndex);
- synchronized (txnOpTrackers) {
- if (primaryIndex && !opTrackers.containsKey(resourceId)) {
- opTrackers.put(resourceId, index.getOperationTracker());
- callbacks.put(resourceId, callback);
- indexPendingOps.put(resourceId, new AtomicInteger(0));
- }
+ if (primaryIndex && !opTrackers.containsKey(resourceId)) {
+ opTrackers.put(resourceId, index.getOperationTracker());
+ callbacks.put(resourceId, callback);
+ indexPendingOps.put(resourceId, new AtomicInteger(0));
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9fcb08b..188bb1b 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
@@ -42,23 +42,21 @@
public EntityLevelTransactionContext(TxnId txnId) {
super(txnId);
- this.primaryIndexTrackers = new HashMap<>();
- this.resourcePendingOps = new HashMap<>();
- this.partitionPendingOps = new HashMap<>();
+ this.primaryIndexTrackers = new ConcurrentHashMap<>();
+ this.resourcePendingOps = new ConcurrentHashMap<>();
+ this.partitionPendingOps = new ConcurrentHashMap<>();
}
@Override
public void register(long resourceId, int partition, ILSMIndex index,
IModificationOperationCallback callback,
boolean primaryIndex) {
super.register(resourceId, partition, index, callback, primaryIndex);
- synchronized (txnOpTrackers) {
- AtomicInteger pendingOps =
partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
- resourcePendingOps.put(resourceId, pendingOps);
- if (primaryIndex) {
- Pair<PrimaryIndexOperationTracker,
IModificationOperationCallback> pair =
- new Pair<>((PrimaryIndexOperationTracker)
index.getOperationTracker(), callback);
- primaryIndexTrackers.put(partition, pair);
- }
+ AtomicInteger pendingOps =
partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
+ resourcePendingOps.put(resourceId, pendingOps);
+ if (primaryIndex) {
+ Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>
pair =
+ new Pair<>((PrimaryIndexOperationTracker)
index.getOperationTracker(), callback);
+ primaryIndexTrackers.put(partition, pair);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2875
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>