apurtell commented on code in PR #2288: URL: https://github.com/apache/phoenix/pull/2288#discussion_r2411879326
########## phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.ServerName; +import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; +import org.apache.phoenix.jdbc.ClusterRoleRecord; +import org.apache.phoenix.jdbc.HighAvailabilityGroup; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.HighAvailabilityPolicy; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +/** + * Utility class for getting Cluster Role Record for the client from RegionServer Endpoints. + */ +public class GetClusterRoleRecordUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(GetClusterRoleRecordUtil.class); + + /** + * Scheduler to fetch ClusterRoleRecord until we get an Active ClusterRoleRecord + */ + private static Map<String, ScheduledExecutorService> schedulerMap = new ConcurrentHashMap<>(); + + private static final Object pollerLock = new Object(); + + private static volatile ScheduledFuture<?> pollerFuture = null; + + private GetClusterRoleRecordUtil() {} + + + /** + * Method to get ClusterRoleRecord from RegionServer Endpoints. it picks a random region server + * and gets the CRR from it. + * @param url URL to create Connection to be used to get RegionServer Endpoint Service + * @param haGroupName Name of the HA group + * @param doRetry Whether to retry if the operation fails + * @param properties Connection properties + * @return ClusterRoleRecord from the first available cluster + * @throws SQLException if there is an error getting the ClusterRoleRecord + */ + private static ClusterRoleRecord getClusterRoleRecord(String url, String haGroupName, boolean doRetry, Properties properties) throws SQLException { + Connection conn = getConnection(url, properties); + PhoenixConnection connection = conn.unwrap(PhoenixConnection.class); + try (Admin admin = connection.getQueryServices().getAdmin()) { + // get all live region servers + List<ServerName> regionServers + = connection.getQueryServices().getLiveRegionServers(); + // pick one at random + ServerName regionServer + = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size())); + + //RPC to regionServer to get the CRR + RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface + service = RegionServerEndpointProtos.RegionServerEndpointService + .newBlockingStub(admin.coprocessorService(regionServer)); + RegionServerEndpointProtos.GetClusterRoleRecordRequest request + = getClusterRoleRecordRequest(haGroupName); + RegionServerEndpointProtos.GetClusterRoleRecordResponse response = service.getClusterRoleRecord(null, request); + + //Check if the ClusterRoleRecord is valid, if not, throw an exception + if (response.getHaGroupName() == null || response.getPolicy() == null || response.getUrl1() == null + || response.getRole1() == null || response.getUrl2() == null || response.getRole2() == null) { + throw new SQLException("Invalid ClusterRoleRecord response from RegionServer"); + } + + //Generate the ClusterRoleRecord from the response + return new ClusterRoleRecord(response.getHaGroupName().toStringUtf8(), HighAvailabilityPolicy.valueOf(response.getPolicy().toStringUtf8()), + response.getUrl1().toStringUtf8(), ClusterRole.valueOf(response.getRole1().toStringUtf8()), + response.getUrl2().toStringUtf8(), ClusterRole.valueOf(response.getRole2().toStringUtf8()), + response.getVersion()); + + } catch (Exception e) { + SQLException parsedException = ClientUtil.parseServerException(e); + //retry once for any exceptions other than StaleMetadataCacheException + LOGGER.error("Error in getting ClusterRoleRecord for {} from url {}", haGroupName, connection.getURL(), parsedException); + if (doRetry) { + // update the list of live region servers + connection.getQueryServices().refreshLiveRegionServers(); + return getClusterRoleRecord(url, haGroupName, false, properties); + } + throw parsedException; + } finally { + conn.close(); + } + + } + + /** + * Method to schedule a poller to fetch ClusterRoleRecord every 5 seconds (or configured value) + * until we get an Active ClusterRoleRecord + * @param conn Connection to be used to get RegionServer Endpoint Service + * @param haGroupName Name of the HA group + * @param properties Connection properties + * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord + * @param haGroup HighAvailabilityGroup object to refresh the ClusterRoleRecord when an Active CRR is found + * @throws SQLException if there is an error getting the ClusterRoleRecord + */ + public static ClusterRoleRecord fetchClusterRoleRecord(String url, String haGroupName, HighAvailabilityGroup haGroup, long pollerInterval, Properties properties) throws SQLException { + ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url, haGroupName, true, properties); + + if (clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER && + !clusterRoleRecord.getRole1().isActive() && !clusterRoleRecord.getRole2().isActive()) { + LOGGER.info("Non-active ClusterRoleRecord found for HA group {}. Scheduling poller to check every {} seconds," + + "until we find an ACTIVE CRR", haGroupName, pollerInterval); + //Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or configured value) + //until we get an Active ClusterRoleRecord and return the Non-Active CRR + schedulePoller(url, haGroupName, haGroup, pollerInterval, properties); Review Comment: How expensive is the fetch of the CRR? If every server and client in a cluster or pointing to the cluster must contact a server on the cluster every 5 seconds, that would be expensive, that server would be pretty hot Also, a 5 second polling interval means every client will see an issue if there is a blip >= 5 seconds in the availability of whatever is serving the CRR. That is not failure tolerant. If something is temporarily down or unavailable but you don't need it it is not actually an availability problem. Tolerance is increased. Should this be notification based and not polled? Fetch it during initialization. Then, we would be "notified" when some operation we expect to succeed is instead failed due to some policy check (on the remote side). And at that time, the up to date CRR could be fetched. Let the client assume the CRR it has is up to date. Let it try something. If the thing it tries fails because the local CRR is possibly out of sync, then fetch the latest CRR and potentially retry the operation if the cached CRR and newly fetched CRR are different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
