linyiqun commented on a change in pull request #1613:
URL: https://github.com/apache/ozone/pull/1613#discussion_r529604167



##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
##########
@@ -0,0 +1,108 @@
+package org.apache.hadoop.ozone.om.request.upgrade;
+
+import org.apache.hadoop.hdds.ratis.RatisUpgradeUtils;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * OM Request used to flush all transactions to disk, take a DB snapshot, and
+ * purge the logs, leaving Ratis in a clean state without unapplied log
+ * entries. This prepares the OM for upgrades/downgrades so that no request
+ * in the log is applied to the database in the old version of the code in one
+ * OM, and the new version of the code on another OM.
+ */
+public class OMPrepareRequest extends OMClientRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPrepareRequest.class);
+
+  // Allow double buffer this many seconds to flush all transactions before
+  // returning an error to the caller.
+  private static final long DOUBLE_BUFFER_FLUSH_TIMEOUT_SECONDS = 5 * 60;
+  // Time between checks to see if double buffer finished flushing.
+  private static final long DOUBLE_BUFFER_FLUSH_CHECK_SECONDS = 1;
+
+  public OMPrepareRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(
+      OzoneManager ozoneManager, long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    LOG.info("Received prepare request with log index {}", 
transactionLogIndex);
+
+    OMResponse.Builder responseBuilder =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    responseBuilder.setCmdType(Type.Prepare);
+    OMClientResponse response = null;
+
+    try {
+      // Create response.
+      PrepareResponse omResponse = PrepareResponse.newBuilder()
+              .setTxnID(transactionLogIndex)
+              .build();
+      responseBuilder.setPrepareForUpgradeResponse(omResponse);
+      response = new OMPrepareResponse(responseBuilder.build());
+
+      // Add response to double buffer before clearing logs.
+      // This guarantees the log index of this request will be the same as
+      // the snapshot index in the prepared state.
+      ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex);
+
+      OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+      OzoneManagerStateMachine omStateMachine =
+          omRatisServer.getOmStateMachine();
+      RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer();
+      RaftServerImpl serverImpl =
+          server.getImpl(omRatisServer.getRaftGroup().getGroupId());
+
+      // Wait for outstanding double buffer entries to flush to disk,
+      // so they will not be purged from the log before being persisted to
+      // the DB.
+      RatisUpgradeUtils.waitForAllTxnsApplied(omStateMachine, serverImpl,
+          DOUBLE_BUFFER_FLUSH_TIMEOUT_SECONDS,
+          DOUBLE_BUFFER_FLUSH_CHECK_SECONDS);

Review comment:
       Transaction entry added in double buffer is async flushed to db. Shall 
we pass transactionLogIndex to double check and make sure previous txns be 
applied.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to