This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0976f6f  HDDS-1766. ContainerStateMachine is unable to increment 
lastAppliedTermIndex. Contributed by  Mukul Kumar Singh. (#1072)
0976f6f is described below

commit 0976f6fc30ed8bb774d823f09c58cea54be05ae7
Author: Mukul Kumar Singh <msi...@apache.org>
AuthorDate: Sun Jul 14 10:53:51 2019 +0530

    HDDS-1766. ContainerStateMachine is unable to increment 
lastAppliedTermIndex. Contributed by  Mukul Kumar Singh. (#1072)
---
 .../server/ratis/ContainerStateMachine.java        | 39 +++++++++++-----------
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index f3e4391..7e4d481 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -197,17 +197,16 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     if (snapshot == null) {
       TermIndex empty =
           TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
-      LOG.info(
-          "The snapshot info is null." + "Setting the last applied index to:"
-              + empty);
+      LOG.info("{}: The snapshot info is null. Setting the last applied index" 
+
+              "to:{}", gid, empty);
       setLastAppliedTermIndex(empty);
-      return RaftLog.INVALID_LOG_INDEX;
+      return empty.getIndex();
     }
 
     final File snapshotFile = snapshot.getFile().getPath().toFile();
     final TermIndex last =
         SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-    LOG.info("Setting the last applied index to " + last);
+    LOG.info("{}: Setting the last applied index to {}", gid, last);
     setLastAppliedTermIndex(last);
 
     // initialize the dispatcher with snapshot so that it build the missing
@@ -243,18 +242,20 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   @Override
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
-    LOG.info("Taking snapshot at termIndex:" + ti);
+    long startTime = Time.monotonicNow();
     if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
       final File snapshotFile =
           storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
-      LOG.info("Taking a snapshot to file {}", snapshotFile);
+      LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
       try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
         persistContainerSet(fos);
       } catch (IOException ioe) {
-        LOG.warn("Failed to write snapshot file \"" + snapshotFile
-            + "\", last applied index=" + ti);
+        LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
+            snapshotFile);
         throw ioe;
       }
+      LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
+          gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
       return ti.getIndex();
     }
     return -1;
@@ -337,7 +338,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
 
   private ContainerCommandResponseProto dispatchCommand(
       ContainerCommandRequestProto requestProto, DispatcherContext context) {
-    LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
+    LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
         requestProto.getCmdType(), requestProto.getContainerID(),
         requestProto.getPipelineID(), requestProto.getTraceID());
     if (isBlockTokenEnabled) {
@@ -355,7 +356,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     }
     ContainerCommandResponseProto response =
         dispatcher.dispatch(requestProto, context);
-    LOG.trace("response {}", response);
+    LOG.trace("{}: response {}", gid, response);
     return response;
   }
 
@@ -395,18 +396,18 @@ public class ContainerStateMachine extends 
BaseStateMachine {
         .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
 
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
-    LOG.debug("writeChunk writeStateMachineData : blockId " + 
write.getBlockID()
-        + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
-        .getChunkName());
+    LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
+        write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+        + write.getChunkData().getChunkName());
     // Remove the future once it finishes execution from the
     // writeChunkFutureMap.
     writeChunkFuture.thenApply(r -> {
       metrics.incNumBytesWrittenCount(
           requestProto.getWriteChunk().getChunkData().getLen());
       writeChunkFutureMap.remove(entryIndex);
-      LOG.debug("writeChunk writeStateMachineData  completed: blockId " + write
-          .getBlockID() + " logIndex " + entryIndex + " chunkName " + write
-          .getChunkData().getChunkName());
+      LOG.debug(gid + ": writeChunk writeStateMachineData  completed: blockId" 
+
+           write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+          + write.getChunkData().getChunkName());
       return r;
     });
     return writeChunkFuture;
@@ -564,12 +565,12 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       }
     } catch (Exception e) {
       metrics.incNumReadStateMachineFails();
-      LOG.error("unable to read stateMachineData:" + e);
+      LOG.error("{} unable to read stateMachineData:", gid, e);
       return completeExceptionally(e);
     }
   }
 
-  private void updateLastApplied() {
+  private synchronized void updateLastApplied() {
     Long appliedTerm = null;
     long appliedIndex = -1;
     for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to