lokiore commented on code in PR #2288:
URL: https://github.com/apache/phoenix/pull/2288#discussion_r2414905897
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -689,6 +727,40 @@ private boolean
isUpdateNeeded(HAGroupStoreRecord.HAGroupState currentHAGroupSta
return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime)
> waitTime);
}
+ /**
+ * Combine two versions into a single long value to be used for comparison
and sending back to client
+ * @param version1 HAGroupRecord version for cluster 1
+ * @param version2 HAGroupRecord version for cluster 2
+ * @return combined version
+ */
+
+ private long combineCanonicalVersions(long version1, long version2) {
Review Comment:
After our recent discussion, CRR version implementation will change a bit on
server side, we will have version for admin changes only. It'll come with
@ritegarg's next change, so either on top of it, or I'll have to pull and
resolve conflicts
##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HighAvailabilityGroup;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility class for getting Cluster Role Record for the client from
RegionServer Endpoints.
+ */
+public class GetClusterRoleRecordUtil {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GetClusterRoleRecordUtil.class);
+
+ /**
+ * Scheduler to fetch ClusterRoleRecord until we get an Active
ClusterRoleRecord
+ */
+ private static Map<String, ScheduledExecutorService> schedulerMap = new
ConcurrentHashMap<>();
+
+ private static final Object pollerLock = new Object();
+
+ private static volatile ScheduledFuture<?> pollerFuture = null;
+
+ private GetClusterRoleRecordUtil() {}
+
+
+ /**
+ * Method to get ClusterRoleRecord from RegionServer Endpoints. it picks a
random region server
+ * and gets the CRR from it.
+ * @param url URL to create Connection to be used to get RegionServer
Endpoint Service
+ * @param haGroupName Name of the HA group
+ * @param doRetry Whether to retry if the operation fails
+ * @param properties Connection properties
+ * @return ClusterRoleRecord from the first available cluster
+ * @throws SQLException if there is an error getting the ClusterRoleRecord
+ */
+ private static ClusterRoleRecord getClusterRoleRecord(String url, String
haGroupName, boolean doRetry, Properties properties) throws SQLException {
+ Connection conn = getConnection(url, properties);
+ PhoenixConnection connection = conn.unwrap(PhoenixConnection.class);
+ try (Admin admin = connection.getQueryServices().getAdmin()) {
+ // get all live region servers
+ List<ServerName> regionServers
+ = connection.getQueryServices().getLiveRegionServers();
+ // pick one at random
+ ServerName regionServer
+ =
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+ //RPC to regionServer to get the CRR
+
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+ service =
RegionServerEndpointProtos.RegionServerEndpointService
+ .newBlockingStub(admin.coprocessorService(regionServer));
+ RegionServerEndpointProtos.GetClusterRoleRecordRequest request
+ = getClusterRoleRecordRequest(haGroupName);
+ RegionServerEndpointProtos.GetClusterRoleRecordResponse response =
service.getClusterRoleRecord(null, request);
+
+ //Check if the ClusterRoleRecord is valid, if not, throw an
exception
+ if (response.getHaGroupName() == null || response.getPolicy() ==
null || response.getUrl1() == null
+ || response.getRole1() == null || response.getUrl2() ==
null || response.getRole2() == null) {
+ throw new SQLException("Invalid ClusterRoleRecord response
from RegionServer");
+ }
+
+ //Generate the ClusterRoleRecord from the response
+ return new
ClusterRoleRecord(response.getHaGroupName().toStringUtf8(),
HighAvailabilityPolicy.valueOf(response.getPolicy().toStringUtf8()),
+ response.getUrl1().toStringUtf8(),
ClusterRole.valueOf(response.getRole1().toStringUtf8()),
+ response.getUrl2().toStringUtf8(),
ClusterRole.valueOf(response.getRole2().toStringUtf8()),
+ response.getVersion());
+
+ } catch (Exception e) {
+ SQLException parsedException = ClientUtil.parseServerException(e);
+ //retry once for any exceptions other than
StaleMetadataCacheException
+ LOGGER.error("Error in getting ClusterRoleRecord for {} from url
{}", haGroupName, connection.getURL(), parsedException);
+ if (doRetry) {
+ // update the list of live region servers
+ connection.getQueryServices().refreshLiveRegionServers();
+ return getClusterRoleRecord(url, haGroupName, false,
properties);
+ }
+ throw parsedException;
+ } finally {
+ conn.close();
+ }
+
+ }
+
+ /**
+ * Method to schedule a poller to fetch ClusterRoleRecord every 5 seconds
(or configured value)
+ * until we get an Active ClusterRoleRecord
+ * @param conn Connection to be used to get RegionServer Endpoint Service
+ * @param haGroupName Name of the HA group
+ * @param properties Connection properties
+ * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
+ * @param haGroup HighAvailabilityGroup object to refresh the
ClusterRoleRecord when an Active CRR is found
+ * @throws SQLException if there is an error getting the ClusterRoleRecord
+ */
+ public static ClusterRoleRecord fetchClusterRoleRecord(String url, String
haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, Properties
properties) throws SQLException {
+ ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url,
haGroupName, true, properties);
+
+ if (clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER &&
+ !clusterRoleRecord.getRole1().isActive() &&
!clusterRoleRecord.getRole2().isActive()) {
+ LOGGER.info("Non-active ClusterRoleRecord found for HA group {}.
Scheduling poller to check every {} seconds," +
+ "until we find an ACTIVE CRR", haGroupName, pollerInterval);
+ //Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or
configured value)
+ //until we get an Active ClusterRoleRecord and return the
Non-Active CRR
+ schedulePoller(url, haGroupName, haGroup, pollerInterval,
properties);
Review Comment:
> Should this be notification based and not polled? Fetch it during
initialization. Then, we would be "notified" when some operation we expect to
succeed is instead failed due to some policy check (on the remote side). And at
that time, the up to date CRR could be fetched.
That is the current behavior, this polling will happen only when client
receives a CRR for a HAGroup, which has both roles as non-active, as client
doesn't have any active cluster, connection creation can't happen then we will
go into polling mode to fetch the CRR until we have an ACTIVE cluster and
client can resume creating connections and poller will stop.
Right now client assumes the CRR is up to date, but if CRR received doesn't
have ACTIVE cluster, client will have to poll as it doesn't know which cluster
to create connection to.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java:
##########
@@ -386,6 +386,9 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT =
"phoenix.view.ttl.tenant_views_per_scan.limit";
// Block mutations based on cluster role record
public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED =
"phoenix.cluster.role.based.mutation.block.enabled";
+ // Check HAGroup is Stale for mutations
+ public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED =
"phoenix.ha.group.stale.for.mutation.check.enabled";
Review Comment:
Sorry for confusion on this one, will update the comment, this is only there
for some to pass which don't need the check on server. like
LogginHAConnectionLimiter, from debugging I could pinpoint it to getting
HAGroupStoreClient in HAGroupStoreManager for checking anything that takes a
bit of time and which leads to test failures, right now we still don't have
autonomous changes required for testing like (updating system table or ZNode
syncs, which we have to do explicitly for testing)
WIll add comment to remove this when everything is available and we won't
need to initialize haGroupStoreClient as many times as we are doing today for
testing.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java:
##########
@@ -280,6 +280,10 @@ public SQLException newException(SQLExceptionInfo info) {
HA_READ_FROM_CLUSTER_FAILED_ON_NULL(1986, "F1Q86", "Unable to read from
cluster for null."),
HA_INVALID_PROPERTIES(1987, "F1Q87", "Invalid properties to get a Phoenix
HA connection."),
HA_CLUSTER_CAN_NOT_CONNECT(1988, "F1Q88", "Cluster can not serve any
requests for this HA group"),
+ CLUSTER_ROLE_RECORD_NOT_FOUND(1989, "F1Q89", "Cluster role record not
found for this HA group"),
+ STALE_CRR_RETHROW_AFTER_REFRESH(1991, "F1Q91", "Stale CRR found during
operation refreshing it"),
Review Comment:
Client is transparently refreshing the CRR when it receives exception that
the CRR is stale, but we are not retrying the mutations, as we are trying to
detect Staleness when Failover event happens and with that the connections will
be closed after refresh, we decided to not retry them and let application retry
with new ACTIVE.
There are 2 codes, one if refresh failed and one if it passes for more
visibility and debugging.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java:
##########
@@ -100,6 +137,17 @@ public boolean isMutationBlocked(String haGroupName)
throws IOException {
return false;
}
+ public boolean isHAGroupOnClientStale(String haGroupName) throws
IOException, SQLException {
+ if (checkStaleCRRForEveryMutation) {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ //If local cluster is not ACTIVE/ACTIVE_TO_STANDBY, it means the
Failover CRR is stale on client
+ //As they are trying to write/read from a STANDBY cluster.
+ return haGroupStoreClient.getPolicy() ==
HighAvailabilityPolicy.FAILOVER &&
Review Comment:
> In that future case, simply assuming the latest state is "stale" because
of disallowed operation, doing a refresh, and getting the same state back, then
fetching again because the state did not change (it is actually up to date),
would cause looping?
Client will have to have some notification that it is sending mutations(btw
this method is only used for mutations but with recent discussion might be used
for point lookups as well) to a wrong cluster, If cluster is not Active, then
client should refresh the CRR cache and retry, refreshing will make this
cluster STANDBY in client's CRR and it won't send the request to this cluster
instead it will go to the dr/secondary cluster which will be in active state.
> In the future some operations may be allowed on the STANDBY side. At
least, this should not foreclose the possibility.
We will have to define what operations and in what scenarios that will be
allowed and then handle accordingly. As currently client only give back
connection to Active cluster only.
For client to determine that the operation is send to wrong cluster, we will
have to check on server if it should receive that operation (We could think of
2 ways either checking the local state/role or comparing versions) we
(@kadirozde and me) chose checking state, as a way forward, as for versions we
will have to send client version with every operation and there are
possibilities that version change happens but role change won't happen at that
time, and now there might be possibility that version change might not happen
with some role change scenarios.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]