Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3446
Change subject: [NO ISSUE][REPL] Wait For Dataset IO Before Replica Sync ...................................................................... [NO ISSUE][REPL] Wait For Dataset IO Before Replica Sync - user model changes: no - storage format changes: no - interface changes: yes Details: - Add API in DatasetLifecycleManager to wait for IO on datasets matching a replication strategy, - Before synchronizing the files on a replica, wait for any on-going IO operations on replicated datasets to make sure the set of files won't change while the replica is being synchronized (e.g. a merge operation won't delete a file that we wanted to send to a replica). Change-Id: I01ed5c9379cf7ae249faeef624d5226ea699cf22 --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java 3 files changed, 25 insertions(+), 0 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/46/3446/1 diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index d18b6ab..2dfd32b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -133,4 +133,12 @@ * @throws HyracksDataException */ void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException; + + /** + * Waits for all on-going IO operations on all open datasets that are matching {@code replicationStrategy}. + * + * @param replicationStrategy + * @throws HyracksDataException + */ + void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index d767219..30b27d8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -599,6 +599,15 @@ } } + @Override + public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException { + for (DatasetResource dsr : datasets.values()) { + if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) { + dsr.getDatasetInfo().waitForIO(); + } + } + } + private void closeIndex(IndexInfo indexInfo) throws HyracksDataException { if (indexInfo.isOpen()) { ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker(); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 09f1205..0f0b5bd 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -53,10 +53,12 @@ private void syncFiles() throws IOException { final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica); + waitForReplicatedDatasetsIO(); fileSync.sync(); // flush replicated dataset to generate disk component for any remaining in-memory components final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); appCtx.getDatasetLifecycleManager().flushDataset(replStrategy); + waitForReplicatedDatasetsIO(); // sync any newly generated files fileSync.sync(); } @@ -75,4 +77,10 @@ (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy); } + + private void waitForReplicatedDatasetsIO() throws HyracksDataException { + // wait for IO operations to ensure replicated datasets files won't change during replica sync + final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + appCtx.getDatasetLifecycleManager().waitForIO(replStrategy); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3446 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-MessageType: newchange Gerrit-Change-Id: I01ed5c9379cf7ae249faeef624d5226ea699cf22 Gerrit-Change-Number: 3446 Gerrit-PatchSet: 1 Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>