[NO ISSUE][REPL] Ignore LSNs of Partially Replicated Indexes - user model changes: no - storage format changes: no - interface changes: yes
Details: - When determining low watermark, ignore LSN of replicated indexes with no checkpoints. - Guard logs in case of unexpected min LSN read failures. - Ensure only one replica is synchronized at a time to prevent possible merge operations from deleting files being synchronized to another replica concurrently. - Ensure index metadata files are replicated first to allow replicas to find any existing files in case of re-synchronization. - Ensure replication channel is closed on replication failures. Change-Id: I9ca08da29bdd8fc4406f2df7e6eb32601caf9388 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2534 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f3784bb3 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f3784bb3 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f3784bb3 Branch: refs/heads/release-0.9.4-pre-rc Commit: f3784bb3e5c5a62ee244adab80bb70b1b811f255 Parents: f7c7059 Author: Murtadha Hubail <mhub...@apache.org> Authored: Tue Mar 27 21:28:14 2018 +0300 Committer: Michael Blow <mb...@apache.org> Committed: Tue Mar 27 13:53:37 2018 -0700 ---------------------------------------------------------------------- .../apache/asterix/app/nc/RecoveryManager.java | 15 +++++++++++++-- .../apache/asterix/app/nc/ReplicaManager.java | 6 ++++++ .../asterix/common/storage/IReplicaManager.java | 8 ++++++++ .../replication/api/PartitionReplica.java | 19 +++++++++++++------ .../sync/ReplicaFilesSynchronizer.java | 12 ++++++++++++ .../replication/sync/ReplicaSynchronizer.java | 13 ++++++++----- 6 files changed, 60 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 4b14a9c..d4e652d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -50,6 +50,7 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.ICheckpointManager; @@ -93,6 +94,7 @@ import org.apache.logging.log4j.Logger; public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { public static final boolean IS_DEBUG_MODE = false; + private static final long SMALLEST_POSSIBLE_LSN = 0; private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(); private final ITransactionSubsystem txnSubsystem; private final LogManager logMgr; @@ -499,8 +501,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { return dsResource.getPartition() == partition; }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); for (DatasetResourceReference indexRef : partitionResources) { - long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark(); - minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); + try { + final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef); + if (idxCheckpointMgr.getCheckpointCount() > 0) { + long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark(); + minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); + } + } catch (Exception e) { + LOGGER.warn("Failed to get min LSN of resource {}", indexRef, e); + // ensure no logs will be deleted in case of unexpected failures + return SMALLEST_POSSIBLE_LSN; + } } } return minRemoteLSN; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java index c821c56..5c5ce93 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java @@ -57,6 +57,7 @@ public class ReplicaManager implements IReplicaManager { * current replicas */ private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>(); + private final Object replicaSyncLock = new Object(); public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) { this.appCtx = appCtx; @@ -126,6 +127,11 @@ public class ReplicaManager implements IReplicaManager { partitions.remove(partition); } + @Override + public Object getReplicaSyncLock() { + return replicaSyncLock; + } + private void closePartitionResources(int partition) throws HyracksDataException { final PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java index b2deb1e..1b8ec53 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java @@ -71,4 +71,12 @@ public interface IReplicaManager { * @throws HyracksDataException */ void release(int partition) throws HyracksDataException; + + /** + * A lock that can be used to ensure a single replica is being synchronized at a time + * by this {@link IReplicaManager} + * + * @return the synchronization lock + */ + Object getReplicaSyncLock(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index bfac451..5c324b1 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -109,13 +109,12 @@ public class PartitionReplica implements IPartitionReplica { public synchronized void close() { try { - if (sc != null && sc.isOpen()) { - ReplicationProtocol.sendGoodbye(sc); - sc.close(); - sc = null; + if (sc != null) { + sendGoodBye(); + NetworkUtil.closeQuietly(sc); } - } catch (IOException e) { - LOGGER.warn("Failed to close channel", e); + } finally { + sc = null; } } @@ -166,4 +165,12 @@ public class PartitionReplica implements IPartitionReplica { LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status); this.status = status; } + + private void sendGoodBye() { + try { + ReplicationProtocol.sendGoodbye(sc); + } catch (IOException e) { + LOGGER.warn("Failed to send good bye to {}", this, e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index fae6ed6..0d97a7a 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -18,9 +18,12 @@ */ package org.apache.asterix.replication.sync; +import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -40,6 +43,13 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc */ public class ReplicaFilesSynchronizer { + private static final Comparator<String> REPLICATED_FILES_COMPARATOR = (file, anotherFile) -> { + if (file.endsWith(METADATA_FILE_NAME) && !anotherFile.endsWith(METADATA_FILE_NAME)) { + return -1; + } + return file.compareTo(anotherFile); + }; + private final PartitionReplica replica; private final INcApplicationContext appCtx; @@ -79,6 +89,8 @@ public class ReplicaFilesSynchronizer { private void replicateMissingFiles(List<String> files) { final FileSynchronizer sync = new FileSynchronizer(appCtx, replica); + // sort files to ensure index metadata files are replicated first + files.sort(REPLICATED_FILES_COMPARATOR); files.forEach(sync::replicate); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 9f397d2..ef85977 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -22,9 +22,9 @@ import java.io.IOException; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.replication.messaging.ReplicationProtocol; -import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; +import org.apache.asterix.replication.messaging.ReplicationProtocol; /** * Performs the steps required to ensure any newly added replica @@ -41,9 +41,12 @@ public class ReplicaSynchronizer { } public void sync() throws IOException { - syncFiles(); - checkpointReplicaIndexes(); - appCtx.getReplicationManager().register(replica); + final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock(); + synchronized (syncLock) { + syncFiles(); + checkpointReplicaIndexes(); + appCtx.getReplicationManager().register(replica); + } } private void syncFiles() throws IOException {