>From Murtadha Al Hubail <[email protected]>:
Murtadha Al Hubail has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633 )
Change subject: [NO ISSUE][OTH] Allow waiting for IO on specific dataset
partition
......................................................................
[NO ISSUE][OTH] Allow waiting for IO on specific dataset partition
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add API to wait for IO on a specific dataset partition.
- When waiting for a partition replica IO ops to finish, only wait
for the replica partition rather than all partitions.
Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
8 files changed, 68 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/33/17633/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047..c7eee21 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -161,12 +161,14 @@
void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate
partitions) throws HyracksDataException;
/**
- * Waits for all ongoing IO operations on all open datasets that are
matching {@code replicationStrategy}.
+ * Waits for all ongoing IO operations on all open datasets that are
matching {@code replicationStrategy} and
+ * {@code partition}.
*
* @param replicationStrategy
+ * @param partition
* @throws HyracksDataException
*/
- void waitForIO(IReplicationStrategy replicationStrategy) throws
HyracksDataException;
+ void waitForIO(IReplicationStrategy replicationStrategy, int partition)
throws HyracksDataException;
/**
* @return the current datasets io stats
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb4..7d3dba4 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@
private static final Logger LOGGER = LogManager.getLogger();
protected final int datasetID;
protected final DatasetInfo dsInfo;
+ protected final int partition;
- public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+ public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int
partition) {
this.datasetID = datasetID;
this.dsInfo = dsInfo;
+ this.partition = partition;
}
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws
HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.declareActiveIOOperation(REPLICATE);
+ dsInfo.declareActiveIOOperation(REPLICATE, partition);
}
}
@@ -59,7 +61,7 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws
HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.undeclareActiveIOOperation(REPLICATE);
+ dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be..87a3c2f 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private static final Logger LOGGER = LogManager.getLogger();
// partition -> index
private final Map<Integer, Set<IndexInfo>> partitionIndexes;
// resourceID -> index
private final Map<Long, IndexInfo> indexes;
+ private final Int2IntMap partitionPendingIO;
private final int datasetID;
private final ILogManager logManager;
private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@
public DatasetInfo(int datasetID, ILogManager logManager) {
this.partitionIndexes = new HashMap<>();
this.indexes = new HashMap<>();
+ this.partitionPendingIO = new Int2IntOpenHashMap();
this.setLastAccess(-1);
this.datasetID = datasetID;
this.setRegistered(false);
@@ -74,7 +79,8 @@
setLastAccess(System.currentTimeMillis());
}
- public synchronized void
declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+ public synchronized void
declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int
partition) {
+ partitionPendingIO.put(partition,
partitionPendingIO.getOrDefault(partition, 0) + 1);
numActiveIOOps++;
switch (opType) {
case FLUSH:
@@ -91,7 +97,8 @@
}
}
- public synchronized void
undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+ public synchronized void
undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int
partition) {
+ partitionPendingIO.put(partition,
partitionPendingIO.getOrDefault(partition, 0) - 1);
numActiveIOOps--;
switch (opType) {
case FLUSH:
@@ -253,6 +260,26 @@
}
}
+ public void waitForIO(int partition) throws HyracksDataException {
+ logManager.log(waitLog);
+ synchronized (this) {
+ while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+ LOGGER.error("number of IO operations cannot be negative for
dataset {}, partition {}", this,
+ partition);
+ throw new IllegalStateException(
+ "Number of IO operations cannot be negative: " + this
+ ", partition " + partition);
+ }
+ }
+ }
+
public synchronized int getPendingFlushes() {
return pendingFlushes;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc..4fc9dd6 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -552,10 +552,10 @@
}
@Override
- public void waitForIO(IReplicationStrategy replicationStrategy) throws
HyracksDataException {
+ public void waitForIO(IReplicationStrategy replicationStrategy, int
partition) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.isOpen() &&
replicationStrategy.isMatch(dsr.getDatasetID())) {
- dsr.getDatasetInfo().waitForIO();
+ dsr.getDatasetInfo().waitForIO(partition);
}
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02..a4ad7cf 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,6 @@
@NotThreadSafe
public class PrimaryIndexOperationTracker extends BaseOperationTracker
implements IoOperationCompleteListener {
private static final Logger LOGGER = LogManager.getLogger();
- private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
@@ -71,8 +70,7 @@
public PrimaryIndexOperationTracker(int datasetID, int partition,
ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
- super(datasetID, dsInfo);
- this.partition = partition;
+ super(datasetID, dsInfo, partition);
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51..f56e5c0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -74,6 +74,7 @@
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
+ private final int partition;
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
@@ -84,6 +85,7 @@
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ this.partition =
ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
componentIds.add(componentId);
}
@@ -259,7 +261,7 @@
@Override
public synchronized void scheduled(ILSMIOOperation operation) throws
HyracksDataException {
- dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+ dsInfo.declareActiveIOOperation(operation.getIOOpertionType(),
partition);
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +284,7 @@
pendingFlushes == 0 ?
firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
}
}
- dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+ dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(),
partition);
}
public synchronized boolean hasPendingFlush() {
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01..68ccd54 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@
private void waitForReplicatedDatasetsIO() throws HyracksDataException {
// wait for IO operations to ensure replicated datasets files won't
change during replica sync
final IReplicationStrategy replStrategy =
appCtx.getReplicationManager().getReplicationStrategy();
- appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+ appCtx.getDatasetLifecycleManager().waitForIO(replStrategy,
replica.getIdentifier().getPartition());
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae3..827b713 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@
public ILSMOperationTracker getOperationTracker(INCServiceContext ctx,
IResource resource) {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext)
ctx.getApplicationContext()).getDatasetLifecycleManager();
- return new BaseOperationTracker(datasetId,
dslcManager.getDatasetInfo(datasetId));
+ int partition =
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return new BaseOperationTracker(datasetId,
dslcManager.getDatasetInfo(datasetId), partition);
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
Gerrit-Change-Number: 17633
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Al Hubail <[email protected]>
Gerrit-MessageType: newchange