Repository: asterixdb Updated Branches: refs/heads/master b82f6dfb8 -> 877a36de4
[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery - user model changes: no - storage format changes: no - interface changes: no Details: - Exclude non-replicated datasets files from delta recovery. - Fix used read buffer for large replication requests. Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2412 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/877a36de Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/877a36de Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/877a36de Branch: refs/heads/master Commit: 877a36de4e7f148bd859e8e5ed49e7d5fe26fb59 Parents: b82f6df Author: Murtadha Hubail <mhub...@apache.org> Authored: Wed Feb 21 04:30:01 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Wed Feb 21 02:24:22 2018 -0800 ---------------------------------------------------------------------- .../messaging/PartitionResourcesListTask.java | 9 +++-- .../messaging/ReplicationProtocol.java | 7 ++-- .../sync/ReplicaFilesSynchronizer.java | 7 ++-- .../PersistentLocalResourceRepository.java | 35 ++++++++++++++------ 4 files changed, 40 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java index 54d3a02..b2b1ad1 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -26,9 +26,10 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -48,8 +49,10 @@ public class PartitionResourcesListTask implements IReplicaTask { final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); localResourceRepository.cleanup(partition); - final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream() - .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); + final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + final List<String> partitionResources = + localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition, partitionResources); ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java index 280a2d4..41e7d9e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java @@ -72,7 +72,7 @@ public class ReplicationProtocol { final ByteBuffer buf = ensureSize(dataBuffer, requestSize); // read request NetworkingUtil.readBytes(socketChannel, buf, requestSize); - return dataBuffer; + return buf; } public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer) @@ -135,6 +135,7 @@ public class ReplicationProtocol { requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); requestBuffer.flip(); NetworkingUtil.transferBufferToChannel(channel, requestBuffer); + channel.socket().getOutputStream().flush(); } catch (IOException e) { throw new ReplicationException(e); } @@ -148,9 +149,9 @@ public class ReplicationProtocol { public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel, ByteBuffer buffer) { try { - ReplicationProtocol.readRequest(socketChannel, buffer); + final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer); final ByteArrayInputStream input = - new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); + new ByteArrayInputStream(requestBuf.array(), requestBuf.position(), requestBuf.limit()); try (DataInputStream dis = new DataInputStream(input)) { switch (type) { case PARTITION_RESOURCES_REQUEST: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index 5658779..fae6ed6 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; @@ -52,8 +53,10 @@ public class ReplicaFilesSynchronizer { final Set<String> replicaFiles = getReplicaFiles(partition); final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); - final Set<String> masterFiles = localResourceRepository.getPartitionIndexesFiles(partition).stream() - .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); + final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + final Set<String> masterFiles = + localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); // find files on master and not on replica final List<String> replicaMissingFiles = masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/877a36de/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index ca22a84..7206382 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -52,6 +52,7 @@ import java.util.stream.Stream; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; @@ -342,18 +343,32 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito }); } - public List<String> getPartitionIndexesFiles(int partition) throws HyracksDataException { - List<String> partitionFiles = new ArrayList<>(); - Set<File> partitionIndexes = getPartitionIndexes(partition); - for (File indexDir : partitionIndexes) { - if (indexDir.isDirectory()) { - File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); - if (indexFiles != null) { - Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add); - } + public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy) + throws HyracksDataException { + final List<String> partitionReplicatedFiles = new ArrayList<>(); + final Set<File> replicatedIndexes = new HashSet<>(); + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile()); + } + } + for (File indexDir : replicatedIndexes) { + partitionReplicatedFiles.addAll(getIndexFiles(indexDir)); + } + return partitionReplicatedFiles; + } + + private List<String> getIndexFiles(File indexDir) { + final List<String> indexFiles = new ArrayList<>(); + if (indexDir.isDirectory()) { + File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); + if (indexFilteredFiles != null) { + Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add); } } - return partitionFiles; + return indexFiles; } private void createStorageRoots() {