jpisaac commented on code in PR #2074:
URL: https://github.com/apache/phoenix/pull/2074#discussion_r1994126520
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java:
##########
@@ -123,15 +158,88 @@ public Connection provide(HighAvailabilityGroup haGroup,
Properties info,
return haGroup.connectActive(info, haURLInfo);
}
}
+
@Override
void transitClusterRole(HighAvailabilityGroup haGroup,
ClusterRoleRecord oldRecord,
ClusterRoleRecord newRecord) {
- LOG.info("Cluster role changed for parallel HA policy.");
+ //No Action for Parallel Policy
+ }
+
+
+ @Override
+ void transitRoleRecordRegistry(HighAvailabilityGroup haGroup,
ClusterRoleRecord oldRecord,
+ ClusterRoleRecord newRecord) throws
SQLException {
+ //Close connections of both clusters as there is a change in
registryType
+ LOG.info("Cluster {} and {} has a change in registryType in HA
group {}, now closing all its connections",
+ oldRecord.getUrl1(), oldRecord.getUrl2() ,
oldRecord.getRegistryType());
+ //close connections for cluster 1
+ closeConnections(haGroup, oldRecord.getUrl1(),
oldRecord.getRegistryType());
+ //close connections for cluster 2
+ closeConnections(haGroup, oldRecord.getUrl2(),
oldRecord.getRegistryType());
+ }
+
+ /**
+ * For PARALLEL policy if there is a change in any of the url then
ParallelPhoenixConnection
+ * objects are invalid
+ * @param haGroup The high availability (HA) group
+ * @param oldRecord The older cluster role record cached in this
client for the given HA group
+ * @param newRecord New cluster role record read from one ZooKeeper
cluster znode
+ * @throws SQLException when not able to close connections
+ */
+ @Override
+ void transitClusterUrl(HighAvailabilityGroup haGroup,
ClusterRoleRecord oldRecord,
+ ClusterRoleRecord newRecord) throws
SQLException {
+ if (!Objects.equals(oldRecord.getUrl1(), newRecord.getUrl1()) &&
+ !Objects.equals(oldRecord.getUrl1(), newRecord.getUrl2()))
{
+ LOG.info("Cluster {} is changed to {} in HA group {}, now
closing all its connections",
+ oldRecord.getUrl1(), newRecord.getUrl1(), haGroup);
+ closeConnections(haGroup, oldRecord.getUrl1(),
oldRecord.getRegistryType());
+ }
+ if (!Objects.equals(oldRecord.getUrl2(), newRecord.getUrl2()) &&
+ !Objects.equals(oldRecord.getUrl2(), newRecord.getUrl1()))
{
+ LOG.info("Cluster {} is changed to {} in HA group {}, now
closing all its connections",
+ oldRecord.getUrl2(), newRecord.getUrl2(), haGroup);
+ closeConnections(haGroup, oldRecord.getUrl2(),
oldRecord.getRegistryType());
+ }
+
}
};
private static final Logger LOG =
LoggerFactory.getLogger(HighAvailabilityGroup.class);
+ /**
+ * Utility to close cqs and all it's connection for specific url of a
HAGroup
+ * @param haGroup The High Availability (HA) Group
+ * @param url The url on which cqs and connections are present
+ * @param registryType The registry Type of connections
+ * @throws SQLException if fails to close the connections
+ */
+ private static void closeConnections(HighAvailabilityGroup haGroup, String
url,
+ ClusterRoleRecord.RegistryType
registryType) throws SQLException {
+ ConnectionQueryServices cqs = null;
+ //Close connections for every HAURLInfo's (different principal) conn
for a give HAGroup
+ for (HAURLInfo haurlInfo :
HighAvailabilityGroup.URLS.get(haGroup.getGroupInfo())) {
+ try {
+ cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(
+ HighAvailabilityGroup.getJDBCUrl(url, haurlInfo,
registryType), haGroup.getProperties());
+ cqs.closeAllConnections(new SQLExceptionInfo
+ .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
+ .setMessage("Phoenix connection got closed due to
failover")
+ .setHaGroupInfo(haGroup.getGroupInfo().toString()));
+ LOG.info("Closed all connections to cluster {} for HA group
{}", url,
+ haGroup.getGroupInfo());
+ } finally {
+ if (cqs != null) {
+ // CQS is closed, but it is not invalidated from global
cache in PhoenixDriver
+ // so that any new connection will get error instead of
creating a new CQS
+ LOG.info("Closing CQS after clusterRoleRecord change for
'{}'", url);
+ cqs.close();
Review Comment:
Will also need to remove it from the CQSI map, otherwise the map will
contain a closed CQSI and will never initialize again.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java:
##########
@@ -145,7 +253,30 @@ abstract Connection provide(HighAvailabilityGroup haGroup,
Properties info, HAUR
throws SQLException;
/**
- * Call-back function when a cluster role transition is detected in the
high availability group.
+ * Call-back function when a cluster role record transition is detected in
the high availability group.
+ *
+ * @param haGroup The high availability (HA) group
+ * @param oldRecord The older cluster role record cached in this client
for the given HA group
+ * @param newRecord New cluster role record read from one ZooKeeper
cluster znode
+ * @throws SQLException if fails to handle the cluster role record
transition
+ */
+ public void transitClusterRoleRecord(HighAvailabilityGroup haGroup,
ClusterRoleRecord oldRecord,
Review Comment:
What about cases when there are multiple changes? Registry types, urls,
roles?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]