apurtell commented on code in PR #2288:
URL: https://github.com/apache/phoenix/pull/2288#discussion_r2411799413
##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java:
##########
@@ -280,6 +280,10 @@ public SQLException newException(SQLExceptionInfo info) {
HA_READ_FROM_CLUSTER_FAILED_ON_NULL(1986, "F1Q86", "Unable to read from
cluster for null."),
HA_INVALID_PROPERTIES(1987, "F1Q87", "Invalid properties to get a Phoenix
HA connection."),
HA_CLUSTER_CAN_NOT_CONNECT(1988, "F1Q88", "Cluster can not serve any
requests for this HA group"),
+ CLUSTER_ROLE_RECORD_NOT_FOUND(1989, "F1Q89", "Cluster role record not
found for this HA group"),
+ STALE_CRR_RETHROW_AFTER_REFRESH(1991, "F1Q91", "Stale CRR found during
operation refreshing it"),
+ STALE_CRR_RETHROW_AFTER_REFRESH_FAILED(1992, "F1Q92", "Stale CRR found
during operation, HAGroup refresh failed"),
Review Comment:
For the case where the refresh failed, this one makes sense to throw.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java:
##########
@@ -280,6 +280,10 @@ public SQLException newException(SQLExceptionInfo info) {
HA_READ_FROM_CLUSTER_FAILED_ON_NULL(1986, "F1Q86", "Unable to read from
cluster for null."),
HA_INVALID_PROPERTIES(1987, "F1Q87", "Invalid properties to get a Phoenix
HA connection."),
HA_CLUSTER_CAN_NOT_CONNECT(1988, "F1Q88", "Cluster can not serve any
requests for this HA group"),
+ CLUSTER_ROLE_RECORD_NOT_FOUND(1989, "F1Q89", "Cluster role record not
found for this HA group"),
+ STALE_CRR_RETHROW_AFTER_REFRESH(1991, "F1Q91", "Stale CRR found during
operation refreshing it"),
Review Comment:
Wouldn't the client transparently refresh the CRR and retry? Why does the
application need to handle it?
##########
phoenix-core-client/src/main/java/org/apache/phoenix/exception/StaleClusterRoleRecordException.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when attempting to do operation on STANDBY Cluster of HA
Connection,
+ * indicating that the client has slate Cluster Role Record info for the
HAGroup.
+ */
+public class StaleClusterRoleRecordException extends IOException {
Review Comment:
While currently it is implied that doing an operation on a STANDBY implies
the client has a "stale" record, this is not going to necessarily be true going
forward.
When there are active-active modes, or similar, a disallowed operation
doesn't mean the client has a stale record, the client's record may be up to
date, it means the client has tried to do an operation that is not supported in
the current mode.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HighAvailabilityGroup;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility class for getting Cluster Role Record for the client from
RegionServer Endpoints.
+ */
+public class GetClusterRoleRecordUtil {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GetClusterRoleRecordUtil.class);
+
+ /**
+ * Scheduler to fetch ClusterRoleRecord until we get an Active
ClusterRoleRecord
+ */
+ private static Map<String, ScheduledExecutorService> schedulerMap = new
ConcurrentHashMap<>();
+
+ private static final Object pollerLock = new Object();
+
+ private static volatile ScheduledFuture<?> pollerFuture = null;
+
+ private GetClusterRoleRecordUtil() {}
+
+
+ /**
+ * Method to get ClusterRoleRecord from RegionServer Endpoints. it picks a
random region server
+ * and gets the CRR from it.
+ * @param url URL to create Connection to be used to get RegionServer
Endpoint Service
+ * @param haGroupName Name of the HA group
+ * @param doRetry Whether to retry if the operation fails
+ * @param properties Connection properties
+ * @return ClusterRoleRecord from the first available cluster
+ * @throws SQLException if there is an error getting the ClusterRoleRecord
+ */
+ private static ClusterRoleRecord getClusterRoleRecord(String url, String
haGroupName, boolean doRetry, Properties properties) throws SQLException {
+ Connection conn = getConnection(url, properties);
+ PhoenixConnection connection = conn.unwrap(PhoenixConnection.class);
+ try (Admin admin = connection.getQueryServices().getAdmin()) {
+ // get all live region servers
+ List<ServerName> regionServers
+ = connection.getQueryServices().getLiveRegionServers();
+ // pick one at random
+ ServerName regionServer
+ =
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+ //RPC to regionServer to get the CRR
+
RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface
+ service =
RegionServerEndpointProtos.RegionServerEndpointService
+ .newBlockingStub(admin.coprocessorService(regionServer));
+ RegionServerEndpointProtos.GetClusterRoleRecordRequest request
+ = getClusterRoleRecordRequest(haGroupName);
+ RegionServerEndpointProtos.GetClusterRoleRecordResponse response =
service.getClusterRoleRecord(null, request);
+
+ //Check if the ClusterRoleRecord is valid, if not, throw an
exception
+ if (response.getHaGroupName() == null || response.getPolicy() ==
null || response.getUrl1() == null
+ || response.getRole1() == null || response.getUrl2() ==
null || response.getRole2() == null) {
+ throw new SQLException("Invalid ClusterRoleRecord response
from RegionServer");
+ }
+
+ //Generate the ClusterRoleRecord from the response
+ return new
ClusterRoleRecord(response.getHaGroupName().toStringUtf8(),
HighAvailabilityPolicy.valueOf(response.getPolicy().toStringUtf8()),
+ response.getUrl1().toStringUtf8(),
ClusterRole.valueOf(response.getRole1().toStringUtf8()),
+ response.getUrl2().toStringUtf8(),
ClusterRole.valueOf(response.getRole2().toStringUtf8()),
+ response.getVersion());
+
+ } catch (Exception e) {
+ SQLException parsedException = ClientUtil.parseServerException(e);
+ //retry once for any exceptions other than
StaleMetadataCacheException
+ LOGGER.error("Error in getting ClusterRoleRecord for {} from url
{}", haGroupName, connection.getURL(), parsedException);
+ if (doRetry) {
+ // update the list of live region servers
+ connection.getQueryServices().refreshLiveRegionServers();
+ return getClusterRoleRecord(url, haGroupName, false,
properties);
+ }
+ throw parsedException;
+ } finally {
+ conn.close();
+ }
+
+ }
+
+ /**
+ * Method to schedule a poller to fetch ClusterRoleRecord every 5 seconds
(or configured value)
+ * until we get an Active ClusterRoleRecord
+ * @param conn Connection to be used to get RegionServer Endpoint Service
+ * @param haGroupName Name of the HA group
+ * @param properties Connection properties
+ * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
+ * @param haGroup HighAvailabilityGroup object to refresh the
ClusterRoleRecord when an Active CRR is found
+ * @throws SQLException if there is an error getting the ClusterRoleRecord
+ */
+ public static ClusterRoleRecord fetchClusterRoleRecord(String url, String
haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, Properties
properties) throws SQLException {
+ ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url,
haGroupName, true, properties);
+
+ if (clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER &&
+ !clusterRoleRecord.getRole1().isActive() &&
!clusterRoleRecord.getRole2().isActive()) {
+ LOGGER.info("Non-active ClusterRoleRecord found for HA group {}.
Scheduling poller to check every {} seconds," +
+ "until we find an ACTIVE CRR", haGroupName, pollerInterval);
+ //Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or
configured value)
+ //until we get an Active ClusterRoleRecord and return the
Non-Active CRR
+ schedulePoller(url, haGroupName, haGroup, pollerInterval,
properties);
Review Comment:
How expensive is the fetch of the CRR?
If every server and client in a cluster or pointing to the cluster must
contact a server on the cluster every 5 seconds, that would be expensive, that
server would be pretty hot
Also, a 5 second polling interval means every client will see an issue if
there is a blip >= 5 seconds in the availability of whatever is serving the
CRR. That is not failure tolerant. If something is temporarily down or
unavailable but you don't need it it is not actually an availability problem.
Tolerance is increased.
Should this be notification based and not polled? Fetch it during
initialization. Then, we would be "notified" when some operation we expect to
succeed is instead failed due to some policy check (on the remote side). And at
that time, the up to date CRR could be fetched.
So the trick is to make sure that the remote side always checks if what the
client is asking for is valid by it's current understanding of the CRR, so it
can throw an exception telling the client to recheck its understanding of the
latest CRR state. When you do that you do not need to poll.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java:
##########
@@ -386,6 +386,9 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT =
"phoenix.view.ttl.tenant_views_per_scan.limit";
// Block mutations based on cluster role record
public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED =
"phoenix.cluster.role.based.mutation.block.enabled";
+ // Check HAGroup is Stale for mutations
+ public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED =
"phoenix.ha.group.stale.for.mutation.check.enabled";
Review Comment:
Why wouldn't we always check if the CRR is up to date if we have a question
about it, i.e. if we tried an operation and got back an unexpected failure?
##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -634,12 +635,24 @@ public void
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
final Set<String> haGroupNames =
extractHAGroupNameAttribute(miniBatchOp);
// Check if mutation is blocked for any of the HAGroupNames
for (String haGroupName : haGroupNames) {
- if (StringUtils.isNotBlank(haGroupName)
- && haGroupStoreManager.isMutationBlocked(haGroupName)) {
- throw new MutationBlockedIOException("Blocking Mutation as
Some CRRs are in "
- + "ACTIVE_TO_STANDBY state and "
- + "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is
true");
- }
+ //TODO: Below approach might be slow need to figure out faster
way, slower part is
+ //getting haGroupStoreClient We can also cache roleRecord (I
tried it and still its
+ //slow due to haGroupStoreClient initialization) and caching
will give us old result
+ //in case one cluster is unreachable instead of UNKNOWN. Check
if mutation's
+ //haGroup is stale
+ boolean isHAGroupOnClientStale = haGroupStoreManager
Review Comment:
If there is a local uncertainty about state, either because state is missing
or not initialized yet or if the remote gave back an error suggesting client
understanding is not current, refresh the CRR, then decide what to do next. I
think this is the way.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java:
##########
@@ -708,7 +709,8 @@ enum JoinType {INNER, LEFT_OUTER}
CLUSTER_ROLE_1 + " VARCHAR," + // Role for peer cluster might not
be accurate, we will use only local role for recovery if needed
CLUSTER_ROLE_2 + " VARCHAR," + // Role for peer cluster might not
be accurate, we will use only local role for recovery if needed
POLICY + " VARCHAR," + // There should be only one policy for an
HA group
- VERSION + " BIGINT," + // Version should be incremented for Admin
Updates, only for CLUSTER_URLs and REGISTRY_TYPE
+ VERSION_1 + " BIGINT," + // Version should be incremented for
Admin Updates, only for CLUSTER_URLs and REGISTRY_TYPE
Review Comment:
_1 and _2 designate the version of two separate clusters, but that isn't
clear. VERSION_1 and VERSION_2 may be two version constants or who knows.
VERSION_CLUSTER_1 or VERSION_CLUSTER_2 are possible names that make it
clear.
Getting this right matters because there will be compatibility concerns if
these are changed later.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -333,10 +339,31 @@ <T> T
wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
while (true) {
try {
return s.get();
- } catch (SQLException e) {
+ } catch (Exception e) {
+ if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) {
+ LOG.debug("StaleHAGroupStoreException found refreshing
HAGroup");
+ //If the exception is due to stale ClusterRoleRecord
version, try
+ //refreshing the ClusterRoleRecord and state transitions
if required
+
+ if (!context.getHAGroup().refreshClusterRoleRecord()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH_FAILED)
+ .setMessage(String.format("Error while running
operation Stale ClusterRoleRecord for HAGroup %s" +
+ " found with version %s,
refreshing HAGroup failed :- %s",
+ context.getHAGroup(),
context.getHAGroup().getRoleRecord().getVersion(), e)).build().buildException();
+ }
+
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH)
Review Comment:
We come back to the earlier question I had about this implied by its naming.
What is an "operation Stale ClusterRoleRecord"? That does not clearly express
what is happening here, nor the name of the ExceptionCode.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -689,6 +727,40 @@ private boolean
isUpdateNeeded(HAGroupStoreRecord.HAGroupState currentHAGroupSta
return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime)
> waitTime);
}
+ /**
+ * Combine two versions into a single long value to be used for comparison
and sending back to client
+ * @param version1 HAGroupRecord version for cluster 1
+ * @param version2 HAGroupRecord version for cluster 2
+ * @return combined version
+ */
+
+ private long combineCanonicalVersions(long version1, long version2) {
Review Comment:
It's a bit weird to represent two values in java as bitfields. Why not
int[2]?
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java:
##########
@@ -100,6 +137,17 @@ public boolean isMutationBlocked(String haGroupName)
throws IOException {
return false;
}
+ public boolean isHAGroupOnClientStale(String haGroupName) throws
IOException, SQLException {
+ if (checkStaleCRRForEveryMutation) {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ //If local cluster is not ACTIVE/ACTIVE_TO_STANDBY, it means the
Failover CRR is stale on client
+ //As they are trying to write/read from a STANDBY cluster.
+ return haGroupStoreClient.getPolicy() ==
HighAvailabilityPolicy.FAILOVER &&
Review Comment:
This doesn't allow for future evolution where the CRR is valid but the local
cluster is simply in the wrong state for a given operation. In other words,
just because a cluster is in the wrong state for a given operation it doesn't
mean the client does not have the most up to date CRR.
In the future some operations may be allowed on the STANDBY side. At least,
this should not foreclose the possibility.
In that future case, considering the latest state "stale", doing a refresh,
and getting the same state back, because it is not stale, would cause looping?
What I am suggesting is getting away from thinking "we are stale"
(definitive, we will always assume we are not up to date) to "we have done
something unexpectedly invalid, let's double check we are using the correct
policy" (not definitive, maybe we are up to date or maybe not).
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -333,10 +339,31 @@ <T> T
wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
while (true) {
try {
return s.get();
- } catch (SQLException e) {
+ } catch (Exception e) {
+ if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) {
+ LOG.debug("StaleHAGroupStoreException found refreshing
HAGroup");
+ //If the exception is due to stale ClusterRoleRecord
version, try
+ //refreshing the ClusterRoleRecord and state transitions
if required
+
+ if (!context.getHAGroup().refreshClusterRoleRecord()) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH_FAILED)
+ .setMessage(String.format("Error while running
operation Stale ClusterRoleRecord for HAGroup %s" +
+ " found with version %s,
refreshing HAGroup failed :- %s",
+ context.getHAGroup(),
context.getHAGroup().getRoleRecord().getVersion(), e)).build().buildException();
+ }
+
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.STALE_CRR_RETHROW_AFTER_REFRESH)
Review Comment:
What is the difference between these two cases? The error messages and error
code names do not provide good context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]