apurtell commented on code in PR #2288:
URL: https://github.com/apache/phoenix/pull/2288#discussion_r2411799413


##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java:
##########
@@ -280,6 +280,10 @@ public SQLException newException(SQLExceptionInfo info) {
     HA_READ_FROM_CLUSTER_FAILED_ON_NULL(1986, "F1Q86", "Unable to read from 
cluster for null."),
     HA_INVALID_PROPERTIES(1987, "F1Q87", "Invalid properties to get a Phoenix 
HA connection."),
     HA_CLUSTER_CAN_NOT_CONNECT(1988, "F1Q88", "Cluster can not serve any 
requests for this HA group"),
+    CLUSTER_ROLE_RECORD_NOT_FOUND(1989, "F1Q89", "Cluster role record not 
found for this HA group"),
+    STALE_CRR_RETHROW_AFTER_REFRESH(1991, "F1Q91", "Stale CRR found during 
operation refreshing it"),
+    STALE_CRR_RETHROW_AFTER_REFRESH_FAILED(1992, "F1Q92", "Stale CRR found 
during operation, HAGroup refresh failed"),

Review Comment:
   For the case where the refresh failed, this one makes sense to throw.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java:
##########
@@ -280,6 +280,10 @@ public SQLException newException(SQLExceptionInfo info) {
     HA_READ_FROM_CLUSTER_FAILED_ON_NULL(1986, "F1Q86", "Unable to read from 
cluster for null."),
     HA_INVALID_PROPERTIES(1987, "F1Q87", "Invalid properties to get a Phoenix 
HA connection."),
     HA_CLUSTER_CAN_NOT_CONNECT(1988, "F1Q88", "Cluster can not serve any 
requests for this HA group"),
+    CLUSTER_ROLE_RECORD_NOT_FOUND(1989, "F1Q89", "Cluster role record not 
found for this HA group"),
+    STALE_CRR_RETHROW_AFTER_REFRESH(1991, "F1Q91", "Stale CRR found during 
operation refreshing it"),

Review Comment:
   Wouldn't the client transparently refresh the CRR and retry? Why does the 
application need to handle it? 



##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordException.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when attempting to do operation on STANDBY Cluster of HA 
Connection,
+ * indicating that the client has slate Cluster Role Record info for the 
HAGroup.
+ */
+public class StaleClusterRoleRecordException extends IOException {

Review Comment:
   While currently it is implied that doing an operation on a STANDBY implies 
the client has a "stale" record, this is not going to necessarily be true going 
forward. 
   When there are active-active modes, or similar, a disallowed operation 
doesn't mean the client has a stale record, the client's record may be up to 
date, it means the client has tried to do an operation that is not supported in 
the current mode. 



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HighAvailabilityGroup;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility class for getting Cluster Role Record for the client from 
RegionServer Endpoints.
+ */
+public class GetClusterRoleRecordUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(GetClusterRoleRecordUtil.class);
+
+    /**
+     * Scheduler to fetch ClusterRoleRecord until we get an Active 
ClusterRoleRecord
+     */
+    private static Map<String, ScheduledExecutorService> schedulerMap = new 
ConcurrentHashMap<>();
+
+    private static final Object pollerLock = new Object();
+
+    private static volatile ScheduledFuture<?> pollerFuture = null;
+
+    private GetClusterRoleRecordUtil() {}
+
+
+    /**
+     * Method to get ClusterRoleRecord from RegionServer Endpoints. it picks a 
random region server
+     * and gets the CRR from it.
+     * @param url URL to create Connection to be used to get RegionServer 
Endpoint Service
+     * @param haGroupName Name of the HA group
+     * @param doRetry Whether to retry if the operation fails
+     * @param properties Connection properties
+     * @return ClusterRoleRecord from the first available cluster
+     * @throws SQLException if there is an error getting the ClusterRoleRecord
+     */
+    private static ClusterRoleRecord getClusterRoleRecord(String url, String 
haGroupName, boolean doRetry, Properties properties) throws SQLException {
+        Connection conn = getConnection(url, properties);
+        PhoenixConnection connection = conn.unwrap(PhoenixConnection.class);
+        try (Admin admin = connection.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = connection.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            //RPC to regionServer to get the CRR
+            
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+                    service = 
RegionServerEndpointProtos.RegionServerEndpointService
+                    .newBlockingStub(admin.coprocessorService(regionServer));
+            RegionServerEndpointProtos.GetClusterRoleRecordRequest request
+                    = getClusterRoleRecordRequest(haGroupName);
+            RegionServerEndpointProtos.GetClusterRoleRecordResponse response = 
service.getClusterRoleRecord(null, request);
+
+            //Check if the ClusterRoleRecord is valid, if not, throw an 
exception
+            if (response.getHaGroupName() == null || response.getPolicy() == 
null || response.getUrl1() == null
+                    || response.getRole1() == null || response.getUrl2() == 
null || response.getRole2() == null) {
+                throw new SQLException("Invalid ClusterRoleRecord response 
from RegionServer");
+            }
+
+            //Generate the ClusterRoleRecord from the response
+            return new 
ClusterRoleRecord(response.getHaGroupName().toStringUtf8(), 
HighAvailabilityPolicy.valueOf(response.getPolicy().toStringUtf8()),
+                            response.getUrl1().toStringUtf8(), 
ClusterRole.valueOf(response.getRole1().toStringUtf8()),
+                            response.getUrl2().toStringUtf8(), 
ClusterRole.valueOf(response.getRole2().toStringUtf8()),
+                            response.getVersion());
+
+        } catch (Exception e) {
+            SQLException parsedException = ClientUtil.parseServerException(e);
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in getting ClusterRoleRecord for {} from url 
{}", haGroupName, connection.getURL(), parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                connection.getQueryServices().refreshLiveRegionServers();
+                return getClusterRoleRecord(url, haGroupName, false, 
properties);
+            }
+            throw parsedException;
+        } finally {
+            conn.close();
+        }
+
+    }
+
+    /**
+     * Method to schedule a poller to fetch ClusterRoleRecord every 5 seconds 
(or configured value)
+     * until we get an Active ClusterRoleRecord
+     * @param conn Connection to be used to get RegionServer Endpoint Service
+     * @param haGroupName Name of the HA group
+     * @param properties Connection properties
+     * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
+     * @param haGroup HighAvailabilityGroup object to refresh the 
ClusterRoleRecord when an Active CRR is found
+     * @throws SQLException if there is an error getting the ClusterRoleRecord
+     */
+    public static ClusterRoleRecord fetchClusterRoleRecord(String url, String 
haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, Properties 
properties) throws SQLException {
+        ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url, 
haGroupName, true, properties);
+
+        if (clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER &&
+                !clusterRoleRecord.getRole1().isActive() && 
!clusterRoleRecord.getRole2().isActive()) {
+            LOGGER.info("Non-active ClusterRoleRecord found for HA group {}. 
Scheduling poller to check every {} seconds," +
+            "until we find an ACTIVE CRR", haGroupName, pollerInterval);
+            //Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or 
configured value)
+            //until we get an Active ClusterRoleRecord and return the 
Non-Active CRR
+            schedulePoller(url, haGroupName, haGroup, pollerInterval, 
properties);

Review Comment:
   How expensive is the fetch of the CRR? 
   
   If every server and client in a cluster or pointing to the cluster must 
contact a server on the cluster every 5 seconds, that would be expensive, that 
server would be pretty hot
   
   Also, a 5 second polling interval means every client will see an issue if 
there is a blip >= 5 seconds in the availability of whatever is serving the 
CRR. That is not failure tolerant. If something is temporarily down or 
unavailable but you don't need it it is not actually an availability problem. 
Tolerance is increased.
   
   Should this be notification based and not polled? Fetch it during 
initialization. Then, we would be "notified" when some operation we expect to 
succeed is instead failed due to some policy check (on the remote side). And at 
that time, the up to date CRR could be fetched. 
   
   So the trick is to make sure that the remote side always checks if what the 
client is asking for is valid by it's current understanding of the CRR, so it 
can throw an exception telling the client to recheck its understanding of the 
latest CRR state. When you do that you do not need to poll. 



##########
phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java:
##########
@@ -386,6 +386,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT = 
"phoenix.view.ttl.tenant_views_per_scan.limit";
     // Block mutations based on cluster role record
     public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = 
"phoenix.cluster.role.based.mutation.block.enabled";
+    // Check HAGroup is Stale for mutations
+    public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = 
"phoenix.ha.group.stale.for.mutation.check.enabled";

Review Comment:
   Why wouldn't we always check if the CRR is up to date if we have a question 
about it, i.e. if we tried an operation and got back an unexpected failure? 



##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -634,12 +635,24 @@ public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
           final Set<String> haGroupNames = 
extractHAGroupNameAttribute(miniBatchOp);
           // Check if mutation is blocked for any of the HAGroupNames
           for (String haGroupName : haGroupNames) {
-              if (StringUtils.isNotBlank(haGroupName)
-                      && haGroupStoreManager.isMutationBlocked(haGroupName)) {
-                  throw new MutationBlockedIOException("Blocking Mutation as 
Some CRRs are in "
-                          + "ACTIVE_TO_STANDBY state and "
-                          + "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is 
true");
-              }
+                //TODO: Below approach might be slow need to figure out faster 
way, slower part is
+                //getting haGroupStoreClient We can also cache roleRecord (I 
tried it and still its
+                //slow due to haGroupStoreClient initialization) and caching 
will give us old result
+                //in case one cluster is unreachable instead of UNKNOWN. Check 
if mutation's
+                //haGroup is stale
+                boolean isHAGroupOnClientStale = haGroupStoreManager

Review Comment:
   If there is a local uncertainty about state, either because state is missing 
or not initialized yet or if the remote gave back an error suggesting client 
understanding is not current, refresh the CRR, then decide what to do next. I 
think this is the way.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java:
##########
@@ -708,7 +709,8 @@ enum JoinType {INNER, LEFT_OUTER}
             CLUSTER_ROLE_1 + " VARCHAR," + // Role for peer cluster might not 
be accurate, we will use only local role for recovery if needed
             CLUSTER_ROLE_2 + " VARCHAR," + // Role for peer cluster might not 
be accurate, we will use only local role for recovery if needed
             POLICY + " VARCHAR," +  // There should be only one policy for an 
HA group
-            VERSION + " BIGINT," +  // Version should be incremented for Admin 
Updates, only for CLUSTER_URLs and REGISTRY_TYPE
+            VERSION_1 + " BIGINT," +  // Version should be incremented for 
Admin Updates, only for CLUSTER_URLs and REGISTRY_TYPE

Review Comment:
   _1 and _2 designate the version of two separate clusters, but that isn't 
clear. VERSION_1 and VERSION_2 may be two version constants or who knows.  
   
   VERSION_CLUSTER_1 or VERSION_CLUSTER_2 are possible names that make it 
clear. 
   
   Getting this right matters because there will be compatibility concerns if 
these are changed later.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -333,10 +339,31 @@ <T> T 
wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
         while (true) {
             try {
                 return s.get();
-            } catch (SQLException e) {
+            } catch (Exception e) {
+                if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) {
+                    LOG.debug("StaleHAGroupStoreException found refreshing 
HAGroup");
+                    //If the exception is due to stale ClusterRoleRecord 
version, try
+                    //refreshing the ClusterRoleRecord and state transitions 
if required
+
+                    if (!context.getHAGroup().refreshClusterRoleRecord()) {
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH_FAILED)
+                                .setMessage(String.format("Error while running 
operation Stale ClusterRoleRecord for HAGroup %s" +
+                                                " found with version %s, 
refreshing HAGroup failed :- %s",
+                                        context.getHAGroup(), 
context.getHAGroup().getRoleRecord().getVersion(), e)).build().buildException();
+                    }
+
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH)

Review Comment:
   We come back to the earlier question I had about this implied by its naming. 
What is an "operation Stale ClusterRoleRecord"? That does not clearly express 
what is happening here, nor the name of the ExceptionCode.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -689,6 +727,40 @@ private boolean 
isUpdateNeeded(HAGroupStoreRecord.HAGroupState currentHAGroupSta
         return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) 
> waitTime);
     }
 
+    /**
+     * Combine two versions into a single long value to be used for comparison 
and sending back to client
+     * @param version1 HAGroupRecord version for cluster 1
+     * @param version2 HAGroupRecord version for cluster 2
+     * @return combined version
+     */
+
+    private long combineCanonicalVersions(long version1, long version2) {

Review Comment:
   It's a bit weird to represent two values in java as bitfields. Why not 
int[2]? 



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java:
##########
@@ -100,6 +137,17 @@ public boolean isMutationBlocked(String haGroupName) 
throws IOException {
         return false;
     }
 
+    public boolean isHAGroupOnClientStale(String haGroupName) throws 
IOException, SQLException {
+        if (checkStaleCRRForEveryMutation) {
+            HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+            //If local cluster is not ACTIVE/ACTIVE_TO_STANDBY, it means the 
Failover CRR is stale on client
+            //As they are trying to write/read from a STANDBY cluster.
+            return haGroupStoreClient.getPolicy() == 
HighAvailabilityPolicy.FAILOVER &&

Review Comment:
   This doesn't allow for future evolution where the CRR is valid but the local 
cluster is simply in the wrong state for a given operation. In other words, 
just because a cluster is in the wrong state for a given operation it doesn't 
mean the client does not have the most up to date CRR. 
   
   In the future some operations may be allowed on the STANDBY side. At least, 
this should not foreclose the possibility.
   
   In that future case, considering the latest state "stale", doing a refresh, 
and getting the same state back, because it is not stale, would cause looping?
   
   What I am suggesting is getting away from thinking "we are stale" 
(definitive, we will always assume we are not up to date) to "we have done 
something unexpectedly invalid, let's double check we are using the correct 
policy" (not definitive, maybe we are up to date or maybe not).



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -333,10 +339,31 @@ <T> T 
wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
         while (true) {
             try {
                 return s.get();
-            } catch (SQLException e) {
+            } catch (Exception e) {
+                if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) {
+                    LOG.debug("StaleHAGroupStoreException found refreshing 
HAGroup");
+                    //If the exception is due to stale ClusterRoleRecord 
version, try
+                    //refreshing the ClusterRoleRecord and state transitions 
if required
+
+                    if (!context.getHAGroup().refreshClusterRoleRecord()) {
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH_FAILED)
+                                .setMessage(String.format("Error while running 
operation Stale ClusterRoleRecord for HAGroup %s" +
+                                                " found with version %s, 
refreshing HAGroup failed :- %s",
+                                        context.getHAGroup(), 
context.getHAGroup().getRoleRecord().getVersion(), e)).build().buildException();
+                    }
+
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH)

Review Comment:
   What is the difference between these two cases? The error messages and error 
code names do not provide good context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to