This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch ozone-0.4.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 9b3c034695d55c1edd7b5874b6929781fe35318e
Author: Mukul Kumar Singh <msi...@apache.org>
AuthorDate: Sun Jul 14 10:53:51 2019 +0530

    HDDS-1766. ContainerStateMachine is unable to increment 
lastAppliedTermIndex. Contributed by  Mukul Kumar Singh. (#1072)
    
    (cherry picked from commit 0976f6fc30ed8bb774d823f09c58cea54be05ae7)
---
 .../server/ratis/ContainerStateMachine.java        | 40 ++++++++++++----------
 1 file changed, 21 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index f4a8008..87826e6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -195,17 +196,16 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     if (snapshot == null) {
       TermIndex empty =
           TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
-      LOG.info(
-          "The snapshot info is null." + "Setting the last applied index to:"
-              + empty);
+      LOG.info("{}: The snapshot info is null. Setting the last applied index" 
+
+              "to:{}", gid, empty);
       setLastAppliedTermIndex(empty);
-      return RaftLog.INVALID_LOG_INDEX;
+      return empty.getIndex();
     }
 
     final File snapshotFile = snapshot.getFile().getPath().toFile();
     final TermIndex last =
         SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-    LOG.info("Setting the last applied index to " + last);
+    LOG.info("{}: Setting the last applied index to {}", gid, last);
     setLastAppliedTermIndex(last);
 
     // initialize the dispatcher with snapshot so that it build the missing
@@ -241,18 +241,20 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   @Override
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
-    LOG.info("Taking snapshot at termIndex:" + ti);
+    long startTime = Time.monotonicNow();
     if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
       final File snapshotFile =
           storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
-      LOG.info("Taking a snapshot to file {}", snapshotFile);
+      LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
       try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
         persistContainerSet(fos);
       } catch (IOException ioe) {
-        LOG.warn("Failed to write snapshot file \"" + snapshotFile
-            + "\", last applied index=" + ti);
+        LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
+            snapshotFile);
         throw ioe;
       }
+      LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
+          gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
       return ti.getIndex();
     }
     return -1;
@@ -326,7 +328,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
 
   private ContainerCommandResponseProto dispatchCommand(
       ContainerCommandRequestProto requestProto, DispatcherContext context) {
-    LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
+    LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
         requestProto.getCmdType(), requestProto.getContainerID(),
         requestProto.getPipelineID(), requestProto.getTraceID());
     if (isBlockTokenEnabled) {
@@ -344,7 +346,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     }
     ContainerCommandResponseProto response =
         dispatcher.dispatch(requestProto, context);
-    LOG.trace("response {}", response);
+    LOG.trace("{}: response {}", gid, response);
     return response;
   }
 
@@ -384,18 +386,18 @@ public class ContainerStateMachine extends 
BaseStateMachine {
         .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
 
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
-    LOG.debug("writeChunk writeStateMachineData : blockId " + 
write.getBlockID()
-        + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
-        .getChunkName());
+    LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
+        write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+        + write.getChunkData().getChunkName());
     // Remove the future once it finishes execution from the
     // writeChunkFutureMap.
     writeChunkFuture.thenApply(r -> {
       metrics.incNumBytesWrittenCount(
           requestProto.getWriteChunk().getChunkData().getLen());
       writeChunkFutureMap.remove(entryIndex);
-      LOG.debug("writeChunk writeStateMachineData  completed: blockId " + write
-          .getBlockID() + " logIndex " + entryIndex + " chunkName " + write
-          .getChunkData().getChunkName());
+      LOG.debug(gid + ": writeChunk writeStateMachineData  completed: blockId" 
+
+           write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+          + write.getChunkData().getChunkName());
       return r;
     });
     return writeChunkFuture;
@@ -554,12 +556,12 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       }
     } catch (Exception e) {
       metrics.incNumReadStateMachineFails();
-      LOG.error("unable to read stateMachineData:" + e);
+      LOG.error("{} unable to read stateMachineData:", gid, e);
       return completeExceptionally(e);
     }
   }
 
-  private void updateLastApplied() {
+  private synchronized void updateLastApplied() {
     Long appliedTerm = null;
     long appliedIndex = -1;
     for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to