>From Murtadha Al Hubail <[email protected]>: Murtadha Al Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633 )
Change subject: [ASTERIXDB-3220][REPL] Allow waiting for IO on specific dataset partition ...................................................................... [ASTERIXDB-3220][REPL] Allow waiting for IO on specific dataset partition - user model changes: no - storage format changes: no - interface changes: yes Details: - Add API to wait for IO on a specific dataset partition. - When waiting for a partition replica IO ops to finish, only wait for the replica partition rather than all partitions. Change-Id: I90f311f602b3c8526556f64d7b25672981fac320 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java 9 files changed, 79 insertions(+), 16 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 1c2a047..c7eee21 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -161,12 +161,14 @@ void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException; /** - * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}. + * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy} and + * {@code partition}. * * @param replicationStrategy + * @param partition * @throws HyracksDataException */ - void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException; + void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException; /** * @return the current datasets io stats diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java index 5964bb4..7d3dba4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java @@ -33,17 +33,19 @@ private static final Logger LOGGER = LogManager.getLogger(); protected final int datasetID; protected final DatasetInfo dsInfo; + protected final int partition; - public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) { + public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int partition) { this.datasetID = datasetID; this.dsInfo = dsInfo; + this.partition = partition; } @Override public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback) throws HyracksDataException { if (opType == LSMOperationType.REPLICATE) { - dsInfo.declareActiveIOOperation(REPLICATE); + dsInfo.declareActiveIOOperation(REPLICATE, partition); } } @@ -59,7 +61,7 @@ public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback) throws HyracksDataException { if (opType == LSMOperationType.REPLICATE) { - dsInfo.undeclareActiveIOOperation(REPLICATE); + dsInfo.undeclareActiveIOOperation(REPLICATE, partition); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java index d15d9be..87a3c2f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java @@ -33,12 +33,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; + public class DatasetInfo extends Info implements Comparable<DatasetInfo> { private static final Logger LOGGER = LogManager.getLogger(); // partition -> index private final Map<Integer, Set<IndexInfo>> partitionIndexes; // resourceID -> index private final Map<Long, IndexInfo> indexes; + private final Int2IntMap partitionPendingIO; private final int datasetID; private final ILogManager logManager; private final LogRecord waitLog = new LogRecord(); @@ -54,6 +58,7 @@ public DatasetInfo(int datasetID, ILogManager logManager) { this.partitionIndexes = new HashMap<>(); this.indexes = new HashMap<>(); + this.partitionPendingIO = new Int2IntOpenHashMap(); this.setLastAccess(-1); this.datasetID = datasetID; this.setRegistered(false); @@ -74,7 +79,8 @@ setLastAccess(System.currentTimeMillis()); } - public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) { + public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) { + partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) + 1); numActiveIOOps++; switch (opType) { case FLUSH: @@ -91,7 +97,8 @@ } } - public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) { + public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) { + partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) - 1); numActiveIOOps--; switch (opType) { case FLUSH: @@ -253,6 +260,26 @@ } } + public void waitForIO(int partition) throws HyracksDataException { + logManager.log(waitLog); + synchronized (this) { + while (partitionPendingIO.getOrDefault(partition, 0) > 0) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + if (partitionPendingIO.getOrDefault(partition, 0) < 0) { + LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this, + partition); + throw new IllegalStateException( + "Number of IO operations cannot be negative: " + this + ", partition " + partition); + } + } + } + public synchronized int getPendingFlushes() { return pendingFlushes; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 117b4fc..4fc9dd6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -552,10 +552,10 @@ } @Override - public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException { + public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) { - dsr.getDatasetInfo().waitForIO(); + dsr.getDatasetInfo().waitForIO(partition); } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index b0d8e02..a4ad7cf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -59,7 +59,6 @@ @NotThreadSafe public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener { private static final Logger LOGGER = LogManager.getLogger(); - private final int partition; // Number of active operations on an ILSMIndex instance. private final AtomicInteger numActiveOperations; private final ILogManager logManager; @@ -71,8 +70,7 @@ public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo, ILSMComponentIdGenerator idGenerator) { - super(datasetID, dsInfo); - this.partition = partition; + super(datasetID, dsInfo, partition); this.logManager = logManager; this.numActiveOperations = new AtomicInteger(); this.idGenerator = idGenerator; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index 1189b51..f56e5c0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -74,6 +74,7 @@ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; protected final DatasetInfo dsInfo; protected final ILSMIndex lsmIndex; + private final int partition; private long firstLsnForCurrentMemoryComponent = 0L; private long persistenceLsn = 0L; private int pendingFlushes = 0; @@ -84,6 +85,7 @@ this.dsInfo = dsInfo; this.lsmIndex = lsmIndex; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; + this.partition = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum(); componentIds.add(componentId); } @@ -259,7 +261,7 @@ @Override public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException { - dsInfo.declareActiveIOOperation(operation.getIOOpertionType()); + dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), partition); if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) { pendingFlushes++; FlushOperation flush = (FlushOperation) operation; @@ -282,7 +284,7 @@ pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN); } } - dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType()); + dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), partition); } public synchronized boolean hasPendingFlush() { diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java index db0911b..33d513f 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java @@ -77,6 +77,7 @@ String indexId = "mockIndexId"; Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); + Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath()); DatasetInfo dsInfo = new DatasetInfo(101, null); LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), @@ -140,6 +141,7 @@ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); + Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath()); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), @@ -161,6 +163,7 @@ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); + Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath()); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), @@ -221,4 +224,8 @@ Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any()); return indexCheckpointManagerProvider; } + + private static String getIndexPath() { + return "storage/partition_0/dataverse/dataset/0/index"; + } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 459ff01..68ccd54 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -103,6 +103,6 @@ private void waitForReplicatedDatasetsIO() throws HyracksDataException { // wait for IO operations to ensure replicated datasets files won't change during replica sync final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); - appCtx.getDatasetLifecycleManager().waitForIO(replStrategy); + appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, replica.getIdentifier().getPartition()); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java index a104ae3..827b713 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java @@ -21,6 +21,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.context.BaseOperationTracker; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IJsonSerializable; @@ -46,7 +47,8 @@ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); - return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId)); + int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath()); + return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId), partition); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: I90f311f602b3c8526556f64d7b25672981fac320 Gerrit-Change-Number: 17633 Gerrit-PatchSet: 4 Gerrit-Owner: Murtadha Al Hubail <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Al Hubail <[email protected]> Gerrit-MessageType: merged
