HDDS-365. Implement flushStateMachineData for containerStateMachine. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2651e2c4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2651e2c4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2651e2c4 Branch: refs/heads/HDFS-12943 Commit: 2651e2c43d0825912669a87afc256bad9f1ea6ed Parents: 7ed458b Author: Mukul Kumar Singh <msi...@apache.org> Authored: Wed Aug 29 07:57:57 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Wed Aug 29 07:58:30 2018 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 2 +- .../transport/server/XceiverServerGrpc.java | 2 +- .../server/ratis/ContainerStateMachine.java | 18 ++++++++++++++++++ hadoop-project/pom.xml | 2 +- 4 files changed, 21 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2651e2c4/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index e2416c2..1622ddb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -87,7 +87,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { } LOG.debug("Connecting to server Port : " + leader.getIpAddress()); channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port) - .usePlaintext(true) + .usePlaintext() .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) .build(); asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2651e2c4/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index f4f3f6f..4dc232d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -77,7 +77,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { datanodeDetails.setPort( DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port)); server = ((NettyServerBuilder) ServerBuilder.forPort(port)) - .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) + .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) .addService(new GrpcXceiverService(dispatcher)) .build(); storageContainer = dispatcher; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2651e2c4/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index ede87f4..68d6d5b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; /** A {@link org.apache.ratis.statemachine.StateMachine} for containers. * @@ -316,6 +317,23 @@ public class ContainerStateMachine extends BaseStateMachine { return LogEntryProto.newBuilder().setSmLogEntry(log).build(); } + /** + * Returns the combined future of all the writeChunks till the given log + * index. The Raft log worker will wait for the stateMachineData to complete + * flush as well. + * + * @param index log index till which the stateMachine data needs to be flushed + * @return Combined future of all writeChunks till the log index given. + */ + @Override + public CompletableFuture<Void> flushStateMachineData(long index) { + List<CompletableFuture<Message>> futureList = + writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) + .map(x -> x.getValue()).collect(Collectors.toList()); + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + futureList.toArray(new CompletableFuture[futureList.size()])); + return combinedFuture; + } /* * This api is used by the leader while appending logs to the follower * This allows the leader to read the state machine data from the http://git-wip-us.apache.org/repos/asf/hadoop/blob/2651e2c4/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index b45b495..18b907f 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -97,7 +97,7 @@ <ldap-api.version>1.0.0-M33</ldap-api.version> <!-- Apache Ratis version --> - <ratis.version>0.3.0-e4a016f-SNAPSHOT</ratis.version> + <ratis.version>0.3.0-e6fd494-SNAPSHOT</ratis.version> <jcache.version>1.0-alpha-1</jcache.version> <ehcache.version>3.3.1</ehcache.version> <hikari.version>2.4.12</hikari.version> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org