szetszwo commented on code in PR #9796:
URL: https://github.com/apache/ozone/pull/9796#discussion_r2849023255


##########
hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto:
##########
@@ -2355,6 +2357,67 @@ message DeleteObjectTaggingRequest {
 message DeleteObjectTaggingResponse {
 }
 
+enum ReadConsistencyProto {
+  // Unknown consistency, the read consistency behavior is decided
+  // by the OM
+  UNKNOWN_READ_CONSISTENCY = 0;

Review Comment:
   It is a good idea to have this as described in 
https://protobuf.dev/best-practices/dos-donts/#unspecified-enum.  How about 
calling it `UNSPECIFIED`?



##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ReadConsistency.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyProto;
+
+/**
+ * Supported read consistency.
+ */
+public enum ReadConsistency {

Review Comment:
   Add static ReadConsistencyHint to avoid building the same proto (except 
LOCAL_LEASE_HINT which need to add context):
   ```java
     private static final ReadConsistencyHint DEFAULT_HINT = ...;
     private static final ReadConsistencyHint LOCAL_LEASE_HINT = ...;
     private static final ReadConsistencyHint LINEARIZABLE_LEADER_ONLY_HINT = 
...;
     private static final ReadConsistencyHint LINEARIZABLE_ALLOW_FOLLOWER_HINT 
= ...;
   
     public ReadConsistencyHint getHint() {
       switch (this) {
         case DEFAULT:
           return DEFAULT_HINT;
         case LOCAL_LEASE:
           return LOCAL_LEASE_HINT;
         case LINEARIZABLE_LEADER_ONLY:
           return LINEARIZABLE_LEADER_ONLY_HINT;
         case LINEARIZABLE_ALLOW_FOLLOWER:
           return LINEARIZABLE_ALLOW_FOLLOWER_HINT;
         default:
           return null;
       }
     }
   ```
   



##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java:
##########
@@ -213,7 +228,26 @@ public Object invoke(Object proxy, final Method method, 
final Object[] args)
         return method.invoke(this, args);
       }
       OMRequest omRequest = parseOMRequest(args);
-      if (useFollowerRead && OmUtils.shouldSendToFollower(omRequest)) {
+
+      // Apply default consistency hint once, before any routing decision.
+      // In the future, we will support per-request hints which allows client 
(e.g. S3 clients)
+      // to specify a custom request header (e.g. x-ozone-read-consistency) as 
a consistency hint
+      // for read requests.
+      boolean isFollowerReadEligible = useFollowerRead && 
OmUtils.shouldSendToFollower(omRequest);
+      if (!omRequest.hasReadConsistencyHint()) {
+        ReadConsistencyProto defaultReadConsistency = isFollowerReadEligible
+            ? followerReadConsistencyType : leaderReadConsistencyType;
+        if (defaultReadConsistency != null &&
+            defaultReadConsistency != 
ReadConsistencyProto.UNKNOWN_READ_CONSISTENCY) {
+          omRequest = omRequest.toBuilder()
+              .setReadConsistencyHint(ReadConsistencyHint.newBuilder()
+                  .setReadConsistency(defaultReadConsistency).build())
+              .build();

Review Comment:
   Change them to followerReadConsistencyType and leaderReadConsistencyType to 
ReadConsistencyHint.  So it does not need to build again and again.
   
   ```java
         if (!omRequest.hasReadConsistencyHint()) {
           final ReadConsistencyHint defaultReadConsistency = 
isFollowerReadEligible
               ? followerReadConsistencyType : leaderReadConsistencyType;
           if (defaultReadConsistency != null) {
             omRequest = omRequest.toBuilder()
                 .setReadConsistencyHint(defaultReadConsistency)
                 .build();
             args[1] = omRequest;
           }
         }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java:
##########
@@ -270,12 +383,19 @@ boolean allowFollowerReadLocalLease(Division 
ratisDivision, long leaseLogLimit,
       return false; // lease time expired
     }
 
+    if (leaseLagLimit == -1) {
+      // Allow infinite lag time, which allows unbounded stale reads

Review Comment:
   It should be "infinite log lag ~time~"



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java:
##########
@@ -216,44 +221,152 @@ public OMRequest getLastRequestToSubmit() {
 
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
-    // Read from leader or followers using linearizable read
-    if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
-        allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
-      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+    if (request.getCmdType().equals(PrepareStatus)) {
+      // PrepareStatus is an OM request that only target a single OM node.
+      // Therefore, all PrepareStatus requests should be served immediately 
without failover regardless
+      // of the OM node leadership or the read consistency. See 
PrepareSubCommand.
+      // The implementation is not ideal, but exists for compatibility reason.
       return handler.handleReadRequest(request);
-    } 
-    // Get current OM's role
-    RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
-    // === 1. Follower linearizable read ===
-    if (raftServerStatus == NOT_LEADER && omRatisServer.isLinearizableRead()) {
-      ozoneManager.getMetrics().incNumLinearizableRead();
-      return ozoneManager.getOmExecutionFlow().submit(request, false);
     }
-    // === 2. Leader local read (skip ReadIndex if allowed) ===
-    if (raftServerStatus == LEADER_AND_READY || 
request.getCmdType().equals(PrepareStatus)) {
-      if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
-        ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
-        // leader directly serves local committed data
+
+    if (!request.hasReadConsistencyHint() || 
!request.getReadConsistencyHint().hasReadConsistency() ||
+        request.getReadConsistencyHint().getReadConsistency() == 
UNKNOWN_READ_CONSISTENCY) {
+      // Read from leader or followers using linearizable read
+      if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
+          allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
+        ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
         return handler.handleReadRequest(request);
       }
-      // otherwise use linearizable path when enabled
-      if (omRatisServer.isLinearizableRead()) {
+      // Get current OM's role
+      RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
+      // === 1. Follower linearizable read ===
+      if (raftServerStatus == NOT_LEADER && 
omRatisServer.isLinearizableRead()) {
         ozoneManager.getMetrics().incNumLinearizableRead();
         return ozoneManager.getOmExecutionFlow().submit(request, false);
       }
+      // === 2. Leader local read (skip ReadIndex if allowed) ===
+      if (raftServerStatus == LEADER_AND_READY) {
+        if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+          // leader directly serves local committed data
+          return handler.handleReadRequest(request);
+        }
+        // otherwise use linearizable path when enabled
+        if (omRatisServer.isLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLinearizableRead();
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        }
 
-      // fallback to local read
-      return handler.handleReadRequest(request);
+        // fallback to local read
+        return handler.handleReadRequest(request);
+      } else {
+        throw createLeaderErrorException(raftServerStatus);
+      }
     } else {
-      throw createLeaderErrorException(raftServerStatus);
+      // If read consistency hint is specified, we should try to respect it 
although
+      // there is no guarantee since it depends on the OM node configuration 
(e.g.
+      // whether OM Raft server enables linearizable read).
+      ReadConsistencyHint readConsistencyHint = 
request.getReadConsistencyHint();
+      ReadConsistencyProto readConsistency = 
readConsistencyHint.getReadConsistency();
+      RaftServerStatus raftServerStatus;
+      switch (readConsistency) {
+      case LOCAL_LEASE:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          if (!ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled()) {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+          LocalLeaseContext localLeaseContext = 
readConsistencyHint.getLocalLeaseContext();
+          long localLeaseLagLimit = localLeaseContext.hasLagLimit() ?
+              localLeaseContext.getLagLimit() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit();
+          long localLeaseLeaseTimeMs = localLeaseContext.hasLeaseTimeMs() ?
+              localLeaseContext.getLeaseTimeMs() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs();
+          if (allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              localLeaseLagLimit, localLeaseLeaseTimeMs)) {
+            ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+            return handler.handleReadRequest(request);
+          }
+          // The LocalLease lag is too high, trigger failover
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          // Although local lease does not apply for leader (since leader is 
always up-to-date)
+          // We still add the local lease metrics for compatibility reasons
+          ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+          return handler.handleReadRequest(request);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_LEADER_ONLY:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            // If linearizable read is not enabled, fallback to leader read
+            return handler.handleReadRequest(request);
+          }
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_ALLOW_FOLLOWER:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case NOT_LEADER:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+        case LEADER_AND_READY:
+          if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+            // leader directly serves local committed data
+            return handler.handleReadRequest(request);
+          }
+
+          // If the Raft server read option is not LINEARIZABLE, this will
+          // use leader read
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+          }
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case DEFAULT:
+      default:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_READY:
+          return handler.handleReadRequest(request);
+        case LEADER_AND_NOT_READY:
+        case NOT_LEADER:
+          throw createLeaderErrorException(raftServerStatus);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      }
     }
   }
 
-  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLogLimit, long leaseTimeMsLimit) {
+  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLagLimit, long leaseTimeMsLimit) {

Review Comment:
   Since there are two types of lag, time lag and log lag, the original name 
`leaseLogLimit` is better.
   
   BTW, we should change the conf
   - `ozone.om.follower.read.local.lease.lag.limit`
   
   to
   - `ozone.om.follower.read.local.lease.log.limit`
   



##########
hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java:
##########
@@ -46,6 +52,8 @@ public class Hadoop27RpcTransport implements OmTransport {
   private final OzoneManagerProtocolPB rpcProxy;
 
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
+  private final boolean followerReadEnabled;
+  private final ReadConsistencyProto defaultLeaderReadConsistency;

Review Comment:
   Use ReadConsistencyHint.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java:
##########
@@ -216,44 +221,152 @@ public OMRequest getLastRequestToSubmit() {
 
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
-    // Read from leader or followers using linearizable read
-    if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
-        allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
-      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+    if (request.getCmdType().equals(PrepareStatus)) {
+      // PrepareStatus is an OM request that only target a single OM node.
+      // Therefore, all PrepareStatus requests should be served immediately 
without failover regardless
+      // of the OM node leadership or the read consistency. See 
PrepareSubCommand.
+      // The implementation is not ideal, but exists for compatibility reason.
       return handler.handleReadRequest(request);
-    } 
-    // Get current OM's role
-    RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
-    // === 1. Follower linearizable read ===
-    if (raftServerStatus == NOT_LEADER && omRatisServer.isLinearizableRead()) {
-      ozoneManager.getMetrics().incNumLinearizableRead();
-      return ozoneManager.getOmExecutionFlow().submit(request, false);
     }
-    // === 2. Leader local read (skip ReadIndex if allowed) ===
-    if (raftServerStatus == LEADER_AND_READY || 
request.getCmdType().equals(PrepareStatus)) {
-      if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
-        ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
-        // leader directly serves local committed data
+
+    if (!request.hasReadConsistencyHint() || 
!request.getReadConsistencyHint().hasReadConsistency() ||
+        request.getReadConsistencyHint().getReadConsistency() == 
UNKNOWN_READ_CONSISTENCY) {

Review Comment:
   For better readability, add private methods:
   ```java
       if (!request.hasReadConsistencyHint() || 
!request.getReadConsistencyHint().hasReadConsistency() ||
           request.getReadConsistencyHint().getReadConsistency() == 
UNKNOWN_READ_CONSISTENCY) {
         return submitReadRequestToOmWithoutHint(request);
       } else {
         // If read consistency hint is specified, we should try to respect it 
although
         // there is no guarantee since it depends on the OM node configuration 
(e.g.
         // whether OM Raft server enables linearizable read).
         final ReadConsistencyHint readConsistencyHint = 
request.getReadConsistencyHint();
         final ReadConsistencyProto readConsistency = 
readConsistencyHint.getReadConsistency();
         switch (readConsistency) {
           case LOCAL_LEASE:
             return submitReadRequestToOmLocalLease(request, 
readConsistencyHint.getLocalLeaseContext());
           case LINEARIZABLE_LEADER_ONLY:
             return submitReadRequestToOmLinearizableLeaderOnly(request);
           case LINEARIZABLE_ALLOW_FOLLOWER:
             return submitReadRequestToOmLinearizableAllowFollower(request);
           case DEFAULT:
           default:
             return submitReadRequestToOmDefault(request);
         }
       }
     }
   ```



##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java:
##########
@@ -45,11 +51,13 @@ public class Hadoop3OmTransport implements OmTransport {
    */
   private static final RpcController NULL_RPC_CONTROLLER = null;
 
+  private final OzoneManagerProtocolPB rpcProxy;
+
   private final HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> 
omFailoverProxyProvider;
+  private final boolean followerReadEnabled;
+  private final ReadConsistencyProto defaultLeaderReadConsistency;

Review Comment:
   Use ReadConsistencyHint



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java:
##########
@@ -514,10 +516,29 @@ private RaftClientRequest createRaftRequestImpl(OMRequest 
omRequest, boolean isW
         .setMessage(
             Message.valueOf(
                 OMRatisHelper.convertRequestToByteString(omRequest)))
-        .setType(isWrite ? RaftClientRequest.writeRequestType() : 
RaftClientRequest.readRequestType())
+        .setType(isWrite ? RaftClientRequest.writeRequestType() : 
getRaftReadRequestType(omRequest))
         .build();
   }
 
+  private RaftClientRequest.Type getRaftReadRequestType(OMRequest omRequest) {

Review Comment:
   Add `static`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java:
##########
@@ -216,44 +221,152 @@ public OMRequest getLastRequestToSubmit() {
 
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
-    // Read from leader or followers using linearizable read
-    if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
-        allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
-      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+    if (request.getCmdType().equals(PrepareStatus)) {
+      // PrepareStatus is an OM request that only target a single OM node.
+      // Therefore, all PrepareStatus requests should be served immediately 
without failover regardless
+      // of the OM node leadership or the read consistency. See 
PrepareSubCommand.
+      // The implementation is not ideal, but exists for compatibility reason.
       return handler.handleReadRequest(request);
-    } 
-    // Get current OM's role
-    RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
-    // === 1. Follower linearizable read ===
-    if (raftServerStatus == NOT_LEADER && omRatisServer.isLinearizableRead()) {
-      ozoneManager.getMetrics().incNumLinearizableRead();
-      return ozoneManager.getOmExecutionFlow().submit(request, false);
     }
-    // === 2. Leader local read (skip ReadIndex if allowed) ===
-    if (raftServerStatus == LEADER_AND_READY || 
request.getCmdType().equals(PrepareStatus)) {
-      if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
-        ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
-        // leader directly serves local committed data
+
+    if (!request.hasReadConsistencyHint() || 
!request.getReadConsistencyHint().hasReadConsistency() ||
+        request.getReadConsistencyHint().getReadConsistency() == 
UNKNOWN_READ_CONSISTENCY) {
+      // Read from leader or followers using linearizable read
+      if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
+          allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
+        ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
         return handler.handleReadRequest(request);
       }
-      // otherwise use linearizable path when enabled
-      if (omRatisServer.isLinearizableRead()) {
+      // Get current OM's role
+      RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
+      // === 1. Follower linearizable read ===
+      if (raftServerStatus == NOT_LEADER && 
omRatisServer.isLinearizableRead()) {
         ozoneManager.getMetrics().incNumLinearizableRead();
         return ozoneManager.getOmExecutionFlow().submit(request, false);
       }
+      // === 2. Leader local read (skip ReadIndex if allowed) ===
+      if (raftServerStatus == LEADER_AND_READY) {
+        if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+          // leader directly serves local committed data
+          return handler.handleReadRequest(request);
+        }
+        // otherwise use linearizable path when enabled
+        if (omRatisServer.isLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLinearizableRead();
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        }
 
-      // fallback to local read
-      return handler.handleReadRequest(request);
+        // fallback to local read
+        return handler.handleReadRequest(request);
+      } else {
+        throw createLeaderErrorException(raftServerStatus);
+      }
     } else {
-      throw createLeaderErrorException(raftServerStatus);
+      // If read consistency hint is specified, we should try to respect it 
although
+      // there is no guarantee since it depends on the OM node configuration 
(e.g.
+      // whether OM Raft server enables linearizable read).
+      ReadConsistencyHint readConsistencyHint = 
request.getReadConsistencyHint();
+      ReadConsistencyProto readConsistency = 
readConsistencyHint.getReadConsistency();
+      RaftServerStatus raftServerStatus;
+      switch (readConsistency) {
+      case LOCAL_LEASE:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          if (!ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled()) {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+          LocalLeaseContext localLeaseContext = 
readConsistencyHint.getLocalLeaseContext();
+          long localLeaseLagLimit = localLeaseContext.hasLagLimit() ?
+              localLeaseContext.getLagLimit() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit();
+          long localLeaseLeaseTimeMs = localLeaseContext.hasLeaseTimeMs() ?
+              localLeaseContext.getLeaseTimeMs() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs();
+          if (allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              localLeaseLagLimit, localLeaseLeaseTimeMs)) {
+            ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+            return handler.handleReadRequest(request);
+          }
+          // The LocalLease lag is too high, trigger failover
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          // Although local lease does not apply for leader (since leader is 
always up-to-date)
+          // We still add the local lease metrics for compatibility reasons
+          ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+          return handler.handleReadRequest(request);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_LEADER_ONLY:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            // If linearizable read is not enabled, fallback to leader read
+            return handler.handleReadRequest(request);
+          }
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_ALLOW_FOLLOWER:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case NOT_LEADER:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+        case LEADER_AND_READY:
+          if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+            // leader directly serves local committed data
+            return handler.handleReadRequest(request);
+          }
+
+          // If the Raft server read option is not LINEARIZABLE, this will
+          // use leader read
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+          }
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case DEFAULT:
+      default:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_READY:
+          return handler.handleReadRequest(request);
+        case LEADER_AND_NOT_READY:
+        case NOT_LEADER:
+          throw createLeaderErrorException(raftServerStatus);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      }
     }
   }
 
-  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLogLimit, long leaseTimeMsLimit) {
+  boolean allowFollowerReadLocalLease(Division ratisDivision, long 
leaseLagLimit, long leaseTimeMsLimit) {
     final DivisionInfo divisionInfo = ratisDivision.getInfo();
     final FollowerInfoProto followerInfo = 
divisionInfo.getRoleInfoProto().getFollowerInfo();
+    if (leaseTimeMsLimit == -1) {
+      leaseTimeMsLimit = Long.MAX_VALUE;
+    }

Review Comment:
   Combine to the if-condition below:
   ```java
       if (leaseTimeMsLimit >= 0 && leaderInfo.getLastRpcElapsedTimeMs() > 
leaseTimeMsLimit) {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java:
##########
@@ -216,44 +221,152 @@ public OMRequest getLastRequestToSubmit() {
 
   private OMResponse submitReadRequestToOM(OMRequest request)
       throws ServiceException {
-    // Read from leader or followers using linearizable read
-    if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
-        allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
-            ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
-      ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+    if (request.getCmdType().equals(PrepareStatus)) {
+      // PrepareStatus is an OM request that only target a single OM node.
+      // Therefore, all PrepareStatus requests should be served immediately 
without failover regardless
+      // of the OM node leadership or the read consistency. See 
PrepareSubCommand.
+      // The implementation is not ideal, but exists for compatibility reason.
       return handler.handleReadRequest(request);
-    } 
-    // Get current OM's role
-    RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
-    // === 1. Follower linearizable read ===
-    if (raftServerStatus == NOT_LEADER && omRatisServer.isLinearizableRead()) {
-      ozoneManager.getMetrics().incNumLinearizableRead();
-      return ozoneManager.getOmExecutionFlow().submit(request, false);
     }
-    // === 2. Leader local read (skip ReadIndex if allowed) ===
-    if (raftServerStatus == LEADER_AND_READY || 
request.getCmdType().equals(PrepareStatus)) {
-      if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
-        ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
-        // leader directly serves local committed data
+
+    if (!request.hasReadConsistencyHint() || 
!request.getReadConsistencyHint().hasReadConsistency() ||
+        request.getReadConsistencyHint().getReadConsistency() == 
UNKNOWN_READ_CONSISTENCY) {
+      // Read from leader or followers using linearizable read
+      if (ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled() &&
+          allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit(),
+              ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs())) {
+        ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
         return handler.handleReadRequest(request);
       }
-      // otherwise use linearizable path when enabled
-      if (omRatisServer.isLinearizableRead()) {
+      // Get current OM's role
+      RaftServerStatus raftServerStatus = omRatisServer.getLeaderStatus();
+      // === 1. Follower linearizable read ===
+      if (raftServerStatus == NOT_LEADER && 
omRatisServer.isLinearizableRead()) {
         ozoneManager.getMetrics().incNumLinearizableRead();
         return ozoneManager.getOmExecutionFlow().submit(request, false);
       }
+      // === 2. Leader local read (skip ReadIndex if allowed) ===
+      if (raftServerStatus == LEADER_AND_READY) {
+        if (ozoneManager.getConfig().isAllowLeaderSkipLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLeaderSkipLinearizableRead();
+          // leader directly serves local committed data
+          return handler.handleReadRequest(request);
+        }
+        // otherwise use linearizable path when enabled
+        if (omRatisServer.isLinearizableRead()) {
+          ozoneManager.getMetrics().incNumLinearizableRead();
+          return ozoneManager.getOmExecutionFlow().submit(request, false);
+        }
 
-      // fallback to local read
-      return handler.handleReadRequest(request);
+        // fallback to local read
+        return handler.handleReadRequest(request);
+      } else {
+        throw createLeaderErrorException(raftServerStatus);
+      }
     } else {
-      throw createLeaderErrorException(raftServerStatus);
+      // If read consistency hint is specified, we should try to respect it 
although
+      // there is no guarantee since it depends on the OM node configuration 
(e.g.
+      // whether OM Raft server enables linearizable read).
+      ReadConsistencyHint readConsistencyHint = 
request.getReadConsistencyHint();
+      ReadConsistencyProto readConsistency = 
readConsistencyHint.getReadConsistency();
+      RaftServerStatus raftServerStatus;
+      switch (readConsistency) {
+      case LOCAL_LEASE:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          if (!ozoneManager.getConfig().isFollowerReadLocalLeaseEnabled()) {
+            throw createLeaderErrorException(raftServerStatus);
+          }
+          LocalLeaseContext localLeaseContext = 
readConsistencyHint.getLocalLeaseContext();
+          long localLeaseLagLimit = localLeaseContext.hasLagLimit() ?
+              localLeaseContext.getLagLimit() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseLagLimit();
+          long localLeaseLeaseTimeMs = localLeaseContext.hasLeaseTimeMs() ?
+              localLeaseContext.getLeaseTimeMs() : 
ozoneManager.getConfig().getFollowerReadLocalLeaseTimeMs();
+          if (allowFollowerReadLocalLease(omRatisServer.getServerDivision(),
+              localLeaseLagLimit, localLeaseLeaseTimeMs)) {
+            ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+            return handler.handleReadRequest(request);
+          }
+          // The LocalLease lag is too high, trigger failover
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          // Although local lease does not apply for leader (since leader is 
always up-to-date)
+          // We still add the local lease metrics for compatibility reasons
+          ozoneManager.getMetrics().incNumFollowerReadLocalLeaseSuccess();
+          return handler.handleReadRequest(request);
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_LEADER_ONLY:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case NOT_LEADER:
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);
+        case LEADER_AND_READY:
+          if (omRatisServer.isLinearizableRead()) {
+            ozoneManager.getMetrics().incNumLinearizableRead();
+            return ozoneManager.getOmExecutionFlow().submit(request, false);
+          } else {
+            // If linearizable read is not enabled, fallback to leader read
+            return handler.handleReadRequest(request);
+          }
+        default:
+          throw createUnknownRaftServerStatusException(raftServerStatus);
+        }
+      case LINEARIZABLE_ALLOW_FOLLOWER:
+        raftServerStatus = omRatisServer.getLeaderStatus();
+        switch (raftServerStatus) {
+        case LEADER_AND_NOT_READY:
+          throw createLeaderErrorException(raftServerStatus);

Review Comment:
   Remove this line since LEADER_AND_NOT_READY and NOT_LEADER should be the 
same.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to