lokiore commented on code in PR #2288:
URL: https://github.com/apache/phoenix/pull/2288#discussion_r2414905897


##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -689,6 +727,40 @@ private boolean 
isUpdateNeeded(HAGroupStoreRecord.HAGroupState currentHAGroupSta
         return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) 
> waitTime);
     }
 
+    /**
+     * Combine two versions into a single long value to be used for comparison 
and sending back to client
+     * @param version1 HAGroupRecord version for cluster 1
+     * @param version2 HAGroupRecord version for cluster 2
+     * @return combined version
+     */
+
+    private long combineCanonicalVersions(long version1, long version2) {

Review Comment:
   After our recent discussion, CRR version implementation will change a bit on 
server side, we will have version for admin changes only. It'll come with 
@ritegarg's next change, so either on top of it, or I'll have to pull and 
resolve conflicts



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HighAvailabilityGroup;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility class for getting Cluster Role Record for the client from 
RegionServer Endpoints.
+ */
+public class GetClusterRoleRecordUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(GetClusterRoleRecordUtil.class);
+
+    /**
+     * Scheduler to fetch ClusterRoleRecord until we get an Active 
ClusterRoleRecord
+     */
+    private static Map<String, ScheduledExecutorService> schedulerMap = new 
ConcurrentHashMap<>();
+
+    private static final Object pollerLock = new Object();
+
+    private static volatile ScheduledFuture<?> pollerFuture = null;
+
+    private GetClusterRoleRecordUtil() {}
+
+
+    /**
+     * Method to get ClusterRoleRecord from RegionServer Endpoints. it picks a 
random region server
+     * and gets the CRR from it.
+     * @param url URL to create Connection to be used to get RegionServer 
Endpoint Service
+     * @param haGroupName Name of the HA group
+     * @param doRetry Whether to retry if the operation fails
+     * @param properties Connection properties
+     * @return ClusterRoleRecord from the first available cluster
+     * @throws SQLException if there is an error getting the ClusterRoleRecord
+     */
+    private static ClusterRoleRecord getClusterRoleRecord(String url, String 
haGroupName, boolean doRetry, Properties properties) throws SQLException {
+        Connection conn = getConnection(url, properties);
+        PhoenixConnection connection = conn.unwrap(PhoenixConnection.class);
+        try (Admin admin = connection.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = connection.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            //RPC to regionServer to get the CRR
+            
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+                    service = 
RegionServerEndpointProtos.RegionServerEndpointService
+                    .newBlockingStub(admin.coprocessorService(regionServer));
+            RegionServerEndpointProtos.GetClusterRoleRecordRequest request
+                    = getClusterRoleRecordRequest(haGroupName);
+            RegionServerEndpointProtos.GetClusterRoleRecordResponse response = 
service.getClusterRoleRecord(null, request);
+
+            //Check if the ClusterRoleRecord is valid, if not, throw an 
exception
+            if (response.getHaGroupName() == null || response.getPolicy() == 
null || response.getUrl1() == null
+                    || response.getRole1() == null || response.getUrl2() == 
null || response.getRole2() == null) {
+                throw new SQLException("Invalid ClusterRoleRecord response 
from RegionServer");
+            }
+
+            //Generate the ClusterRoleRecord from the response
+            return new 
ClusterRoleRecord(response.getHaGroupName().toStringUtf8(), 
HighAvailabilityPolicy.valueOf(response.getPolicy().toStringUtf8()),
+                            response.getUrl1().toStringUtf8(), 
ClusterRole.valueOf(response.getRole1().toStringUtf8()),
+                            response.getUrl2().toStringUtf8(), 
ClusterRole.valueOf(response.getRole2().toStringUtf8()),
+                            response.getVersion());
+
+        } catch (Exception e) {
+            SQLException parsedException = ClientUtil.parseServerException(e);
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in getting ClusterRoleRecord for {} from url 
{}", haGroupName, connection.getURL(), parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                connection.getQueryServices().refreshLiveRegionServers();
+                return getClusterRoleRecord(url, haGroupName, false, 
properties);
+            }
+            throw parsedException;
+        } finally {
+            conn.close();
+        }
+
+    }
+
+    /**
+     * Method to schedule a poller to fetch ClusterRoleRecord every 5 seconds 
(or configured value)
+     * until we get an Active ClusterRoleRecord
+     * @param conn Connection to be used to get RegionServer Endpoint Service
+     * @param haGroupName Name of the HA group
+     * @param properties Connection properties
+     * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
+     * @param haGroup HighAvailabilityGroup object to refresh the 
ClusterRoleRecord when an Active CRR is found
+     * @throws SQLException if there is an error getting the ClusterRoleRecord
+     */
+    public static ClusterRoleRecord fetchClusterRoleRecord(String url, String 
haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, Properties 
properties) throws SQLException {
+        ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url, 
haGroupName, true, properties);
+
+        if (clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER &&
+                !clusterRoleRecord.getRole1().isActive() && 
!clusterRoleRecord.getRole2().isActive()) {
+            LOGGER.info("Non-active ClusterRoleRecord found for HA group {}. 
Scheduling poller to check every {} seconds," +
+            "until we find an ACTIVE CRR", haGroupName, pollerInterval);
+            //Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or 
configured value)
+            //until we get an Active ClusterRoleRecord and return the 
Non-Active CRR
+            schedulePoller(url, haGroupName, haGroup, pollerInterval, 
properties);

Review Comment:
   > Should this be notification based and not polled? Fetch it during 
initialization. Then, we would be "notified" when some operation we expect to 
succeed is instead failed due to some policy check (on the remote side). And at 
that time, the up to date CRR could be fetched.
   
   That is the current behavior, this polling will happen only when client 
receives a CRR for a HAGroup, which has both roles as non-active, as client 
doesn't have any active cluster, connection creation can't happen then we will 
go into polling mode to fetch the CRR until we have an ACTIVE cluster and 
client can resume creating connections and poller will stop.
   
   Right now client assumes the CRR is up to date, but if CRR received doesn't 
have ACTIVE cluster, client will have to poll as it doesn't know which cluster 
to create connection to.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java:
##########
@@ -386,6 +386,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT = 
"phoenix.view.ttl.tenant_views_per_scan.limit";
     // Block mutations based on cluster role record
     public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = 
"phoenix.cluster.role.based.mutation.block.enabled";
+    // Check HAGroup is Stale for mutations
+    public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = 
"phoenix.ha.group.stale.for.mutation.check.enabled";

Review Comment:
   Sorry for confusion on this one, will update the comment, this is only there 
for some to pass which don't need the check on server. like 
LogginHAConnectionLimiter, from debugging I could pinpoint it to getting 
HAGroupStoreClient in HAGroupStoreManager for checking anything that takes a 
bit of time and which leads to test failures, right now we still don't have 
autonomous changes required for testing like (updating system table or ZNode 
syncs, which we have to do explicitly for testing)
   
   WIll add comment to remove this when everything is available and we won't 
need to initialize haGroupStoreClient as many times as we are doing today for 
testing.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java:
##########
@@ -280,6 +280,10 @@ public SQLException newException(SQLExceptionInfo info) {
     HA_READ_FROM_CLUSTER_FAILED_ON_NULL(1986, "F1Q86", "Unable to read from 
cluster for null."),
     HA_INVALID_PROPERTIES(1987, "F1Q87", "Invalid properties to get a Phoenix 
HA connection."),
     HA_CLUSTER_CAN_NOT_CONNECT(1988, "F1Q88", "Cluster can not serve any 
requests for this HA group"),
+    CLUSTER_ROLE_RECORD_NOT_FOUND(1989, "F1Q89", "Cluster role record not 
found for this HA group"),
+    STALE_CRR_RETHROW_AFTER_REFRESH(1991, "F1Q91", "Stale CRR found during 
operation refreshing it"),

Review Comment:
   Client is transparently refreshing the CRR when it receives exception that 
the CRR is stale, but we are not retrying the mutations, as we are trying to 
detect Staleness when Failover event happens and with that the connections will 
be closed after refresh, we decided to not retry them and let application retry 
with new ACTIVE.
   There are 2 codes, one if refresh failed and one if it passes for more 
visibility and debugging.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java:
##########
@@ -100,6 +137,17 @@ public boolean isMutationBlocked(String haGroupName) 
throws IOException {
         return false;
     }
 
+    public boolean isHAGroupOnClientStale(String haGroupName) throws 
IOException, SQLException {
+        if (checkStaleCRRForEveryMutation) {
+            HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+            //If local cluster is not ACTIVE/ACTIVE_TO_STANDBY, it means the 
Failover CRR is stale on client
+            //As they are trying to write/read from a STANDBY cluster.
+            return haGroupStoreClient.getPolicy() == 
HighAvailabilityPolicy.FAILOVER &&

Review Comment:
   > In that future case, simply assuming the latest state is "stale" because 
of disallowed operation, doing a refresh, and getting the same state back, then 
fetching again because the state did not change (it is actually up to date), 
would cause looping?
   
   Client will have to have some notification that it is sending mutations(btw 
this method is only used for mutations but with recent discussion might be used 
for point lookups as well) to a wrong cluster, If cluster is not Active, then 
client should refresh the CRR cache and retry, refreshing will make this 
cluster STANDBY in client's CRR and it won't send the request to this cluster 
instead it will go to the dr/secondary cluster which will be in active state.
   
   > In the future some operations may be allowed on the STANDBY side. At 
least, this should not foreclose the possibility.
   
   We will have to define what operations and in what scenarios that will be 
allowed and then handle accordingly. As currently client only give back 
connection to Active cluster only.
   
   
   For client to determine that the operation is send to wrong cluster, we will 
have to check on server if it should receive that operation (We could think of 
2 ways either checking the local state/role or comparing versions) we 
(@kadirozde and me) chose checking state, as a way forward, as for versions we 
will have to send client version with every operation and there are 
possibilities that version change happens but role change won't happen at that 
time, and now there might be possibility that version change might not happen 
with some role change scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to