This is an automated email from the ASF dual-hosted git repository. aengineer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 91cc197 HDDS-1757. Use ExecutorService in OzoneManagerStateMachine. (#1048) 91cc197 is described below commit 91cc19722796877f134fd04f60229ac47a1bd6e0 Author: Bharat Viswanadham <bha...@apache.org> AuthorDate: Tue Jul 2 16:02:27 2019 -0700 HDDS-1757. Use ExecutorService in OzoneManagerStateMachine. (#1048) --- .../ozone/om/ratis/OzoneManagerStateMachine.java | 43 +++++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 1bd5a70..d9eb0b9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -19,11 +19,16 @@ package org.apache.hadoop.ozone.om.ratis; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ServiceException; import java.io.IOException; import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.ozone.container.common.transport.server.ratis .ContainerStateMachine; import org.apache.hadoop.ozone.om.OzoneManager; @@ -38,6 +43,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler; import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; @@ -70,6 +76,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private RaftGroupId raftGroupId; private long lastAppliedIndex = 0; private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; + private final ExecutorService executorService; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; @@ -79,6 +86,9 @@ public class OzoneManagerStateMachine extends BaseStateMachine { this::updateLastAppliedIndex); this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager, ozoneManagerDoubleBuffer); + ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build(); + this.executorService = HadoopExecutors.newSingleThreadExecutor(build); } /** @@ -132,8 +142,36 @@ public class OzoneManagerStateMachine extends BaseStateMachine { OMRequest request = OMRatisHelper.convertByteStringToOMRequest( trx.getStateMachineLogEntry().getLogData()); long trxLogIndex = trx.getLogEntry().getIndex(); - CompletableFuture<Message> future = CompletableFuture - .supplyAsync(() -> runCommand(request, trxLogIndex)); + // In the current approach we have one single global thread executor. + // with single thread. Right now this is being done for correctness, as + // applyTransaction will be run on multiple OM's we want to execute the + // transactions in the same order on all OM's, otherwise there is a + // chance that OM replica's can be out of sync. + // TODO: In this way we are making all applyTransactions in + // OM serial order. Revisit this in future to use multiple executors for + // volume/bucket. + + // Reason for not immediately implementing executor per volume is, if + // one executor operations are slow, we cannot update the + // lastAppliedIndex in OzoneManager StateMachine, even if other + // executor has completed the transactions with id more. + + // We have 300 transactions, And for each volume we have transactions + // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 - + // 299. + // Example: Executor1 - Volume1 - 100 (current completed transaction) + // Example: Executor2 - Volume2 - 299 (current completed transaction) + + // Now we have applied transactions of 0 - 100 and 149 - 299. We + // cannot update lastAppliedIndex to 299. We need to update it to 100, + // since 101 - 149 are not applied. When OM restarts it will + // applyTransactions from lastAppliedIndex. + // We can update the lastAppliedIndex to 100, and update it to 299, + // only after completing 101 - 149. In initial stage, we are starting + // with single global executor. Will revisit this when needed. + + CompletableFuture<Message> future = CompletableFuture.supplyAsync( + () -> runCommand(request, trxLogIndex), executorService); return future; } catch (IOException e) { return completeExceptionally(e); @@ -301,6 +339,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { public void stop() { ozoneManagerDoubleBuffer.stop(); + HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org