This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 91cc197  HDDS-1757. Use ExecutorService in OzoneManagerStateMachine. 
(#1048)
91cc197 is described below

commit 91cc19722796877f134fd04f60229ac47a1bd6e0
Author: Bharat Viswanadham <bha...@apache.org>
AuthorDate: Tue Jul 2 16:02:27 2019 -0700

    HDDS-1757. Use ExecutorService in OzoneManagerStateMachine. (#1048)
---
 .../ozone/om/ratis/OzoneManagerStateMachine.java   | 43 +++++++++++++++++++++-
 1 file changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 1bd5a70..d9eb0b9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -19,11 +19,16 @@ package org.apache.hadoop.ozone.om.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -38,6 +43,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -70,6 +76,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   private RaftGroupId raftGroupId;
   private long lastAppliedIndex = 0;
   private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+  private final ExecutorService executorService;
 
   public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
     this.omRatisServer = ratisServer;
@@ -79,6 +86,9 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
             this::updateLastAppliedIndex);
     this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
         ozoneManagerDoubleBuffer);
+    ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
+    this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
   }
 
   /**
@@ -132,8 +142,36 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
       OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
           trx.getStateMachineLogEntry().getLogData());
       long trxLogIndex = trx.getLogEntry().getIndex();
-      CompletableFuture<Message> future = CompletableFuture
-          .supplyAsync(() -> runCommand(request, trxLogIndex));
+      // In the current approach we have one single global thread executor.
+      // with single thread. Right now this is being done for correctness, as
+      // applyTransaction will be run on multiple OM's we want to execute the
+      // transactions in the same order on all OM's, otherwise there is a
+      // chance that OM replica's can be out of sync.
+      // TODO: In this way we are making all applyTransactions in
+      // OM serial order. Revisit this in future to use multiple executors for
+      // volume/bucket.
+
+      // Reason for not immediately implementing executor per volume is, if
+      // one executor operations are slow, we cannot update the
+      // lastAppliedIndex in OzoneManager StateMachine, even if other
+      // executor has completed the transactions with id more.
+
+      // We have 300 transactions, And for each volume we have transactions
+      // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
+      // 299.
+      // Example: Executor1 - Volume1 - 100 (current completed transaction)
+      // Example: Executor2 - Volume2 - 299 (current completed transaction)
+
+      // Now we have applied transactions of 0 - 100 and 149 - 299. We
+      // cannot update lastAppliedIndex to 299. We need to update it to 100,
+      // since 101 - 149 are not applied. When OM restarts it will
+      // applyTransactions from lastAppliedIndex.
+      // We can update the lastAppliedIndex to 100, and update it to 299,
+      // only after completing 101 - 149. In initial stage, we are starting
+      // with single global executor. Will revisit this when needed.
+
+      CompletableFuture<Message> future = CompletableFuture.supplyAsync(
+          () -> runCommand(request, trxLogIndex), executorService);
       return future;
     } catch (IOException e) {
       return completeExceptionally(e);
@@ -301,6 +339,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
 
   public void stop() {
     ozoneManagerDoubleBuffer.stop();
+    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to