This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new 5ffd6490ed PHOENIX-7566 HAGroupState subscription feature and state
management for ReplicationLogReader (#2274)
5ffd6490ed is described below
commit 5ffd6490ede889ab65f8a2de5a8b27cdb968150e
Author: ritegarg <[email protected]>
AuthorDate: Tue Sep 16 22:44:00 2025 -0700
PHOENIX-7566 HAGroupState subscription feature and state management for
ReplicationLogReader (#2274)
Co-authored-by: Ritesh Garg
<[email protected]>
---
.../java/org/apache/phoenix/jdbc/ClusterType.java | 35 ++
.../apache/phoenix/jdbc/HAGroupStateListener.java | 57 ++
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 250 ++++++--
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 211 +++++--
.../apache/phoenix/jdbc/HAGroupStoreRecord.java | 33 +-
...gionServerEndpointITWithConsistentFailover.java | 4 +-
.../IndexRegionObserverMutationBlockingIT.java | 4 +-
.../phoenix/jdbc/HAGroupStateSubscriptionIT.java | 644 +++++++++++++++++++++
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 9 +-
.../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 277 ++++++++-
.../org/apache/phoenix/jdbc/PhoenixHAAdminIT.java | 6 +-
.../apache/phoenix/util/HAGroupStoreTestUtil.java | 17 +-
12 files changed, 1423 insertions(+), 124 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
new file mode 100644
index 0000000000..e194c23e35
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+/**
+ * Enumeration representing the type of cluster in an HA group configuration.
+ * Used to distinguish between local and peer clusters when subscribing to
+ * HA group state change notifications.
+ */
+public enum ClusterType {
+ /**
+ * Represents the local cluster where the client is running.
+ */
+ LOCAL,
+
+ /**
+ * Represents the peer cluster in the HA group configuration.
+ */
+ PEER
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
new file mode 100644
index 0000000000..634f50e0d0
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+
+/**
+ * Interface for external clients who want to be notified of HA group state
transitions.
+ *
+ * <p>Listeners can subscribe to be notified when:</p>
+ * <ul>
+ * <li>Specific state transitions occur (from one state to another)</li>
+ * <li>Any transition to a target state occurs (from any state to a specific
state)</li>
+ * </ul>
+ *
+ * <p>Notifications are provided for both local and peer cluster state changes,
+ * distinguished by the {@link ClusterType} parameter.</p>
+ *
+ * @see HAGroupStoreManager#subscribeToTargetState
+ */
+public interface HAGroupStateListener {
+
+ /**
+ * Called when an HA group state transition occurs.
+ *
+ * <p>Implementations should be fast and non-blocking to avoid impacting
+ * the HA group state management system. If heavy processing is required,
+ * consider delegating to a separate thread.</p>
+ *
+ * @param haGroupName the name of the HA group that transitioned
+ * @param toState the new state after the transition
+ * @param modifiedTime the time the state transition occurred
+ * @param clusterType whether this transition occurred on the local or
peer cluster
+ *
+ * @throws Exception implementations may throw exceptions, but they will be
+ * logged and will not prevent other listeners from
being notified
+ */
+ void onStateChange(String haGroupName,
+ HAGroupState toState,
+ long modifiedTime,
+ ClusterType clusterType);
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 123dc28d5c..e4743bc7fd 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -25,10 +25,13 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -76,14 +79,14 @@ import static
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
*/
public class HAGroupStoreClient implements Closeable {
- public static final String ZK_CONSISTENT_HA_NAMESPACE =
- "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
+ public static final String ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE =
"phoenix"
+ + ZKPaths.PATH_SEPARATOR + "consistentHA"
+ + ZKPaths.PATH_SEPARATOR + "groupState";
private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS
= 30000L;
// Multiplier for ZK session timeout to account for time it will take for
HMaster to abort
// the region server in case ZK connection is lost from the region server.
- private static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
- private static final String CACHE_TYPE_LOCAL = "LOCAL";
- private static final String CACHE_TYPE_PEER = "PEER";
+ @VisibleForTesting
+ static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
private PhoenixHAAdmin phoenixHaAdmin;
private PhoenixHAAdmin peerPhoenixHaAdmin;
private static final Logger LOGGER =
LoggerFactory.getLogger(HAGroupStoreClient.class);
@@ -108,6 +111,14 @@ public class HAGroupStoreClient implements Closeable {
private final PathChildrenCacheListener
peerCustomPathChildrenCacheListener;
// Wait time for sync mode
private final long waitTimeForSyncModeInMs;
+ // State tracking for transition detection
+ private volatile HAGroupState lastKnownLocalState;
+ private volatile HAGroupState lastKnownPeerState;
+
+ // Subscription storage for HA group state change notifications per client
instance
+ // Map key format: "clusterType:targetState" -> Set<Listeners>
+ private final ConcurrentHashMap<String,
CopyOnWriteArraySet<HAGroupStateListener>>
+ targetStateSubscribers = new ConcurrentHashMap<>();
// Policy for the HA group
private HighAvailabilityPolicy policy;
private ClusterRole clusterRole;
@@ -131,7 +142,7 @@ public class HAGroupStoreClient implements Closeable {
* @return HAGroupStoreClient instance
*/
public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf,
String haGroupName,
- String zkUrl) throws SQLException {
+ String zkUrl) {
Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null");
String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf));
Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null");
@@ -159,7 +170,7 @@ public class HAGroupStoreClient implements Closeable {
/**
* Get the list of HAGroupNames from system table.
* We can also get the list of HAGroupNames from the system table by
providing the zkUrl in
- * where clause but we need to match the formatted zkUrl with the zkUrl in
the system table so
+ * where clause, but we need to match the formatted zkUrl with the zkUrl
in the system table so
* that matching is done correctly.
*
* @param zkUrl for connecting to Table
@@ -206,10 +217,11 @@ public class HAGroupStoreClient implements Closeable {
// Initialize HAGroupStoreClient attributes
initializeHAGroupStoreClientAttributes(haGroupName);
// Initialize Phoenix HA Admin
- this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf,
ZK_CONSISTENT_HA_NAMESPACE);
+ this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl,
+ conf, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
// Initialize local cache
this.pathChildrenCache =
initializePathChildrenCache(phoenixHaAdmin,
- pathChildrenCacheListener, CACHE_TYPE_LOCAL);
+ pathChildrenCacheListener, ClusterType.LOCAL);
// Initialize ZNode if not present in ZK
initializeZNodeIfNeeded();
if (this.pathChildrenCache != null) {
@@ -263,7 +275,7 @@ public class HAGroupStoreClient implements Closeable {
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
}
- return fetchCacheRecord(this.pathChildrenCache,
CACHE_TYPE_LOCAL).getLeft();
+ return fetchCacheRecord(this.pathChildrenCache,
ClusterType.LOCAL).getLeft();
}
/**
@@ -283,7 +295,7 @@ public class HAGroupStoreClient implements Closeable {
throw new IOException("HAGroupStoreClient is not healthy");
}
Pair<HAGroupStoreRecord, Stat> cacheRecord = fetchCacheRecord(
- this.pathChildrenCache, CACHE_TYPE_LOCAL);
+ this.pathChildrenCache, ClusterType.LOCAL);
HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft();
Stat currentHAGroupStoreRecordStat = cacheRecord.getRight();
if (currentHAGroupStoreRecord == null) {
@@ -293,10 +305,29 @@ public class HAGroupStoreClient implements Closeable {
}
if (isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
currentHAGroupStoreRecordStat.getMtime(), haGroupState)) {
+ // We maintain last sync time as the last time cluster was in
sync state.
+ // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC,
record that time
+ // Once state changes back to ACTIVE_IN_SYNC or the role is
+ // NOT ACTIVE or ACTIVE_TO_STANDBY
+ // set the time to null to mark that we are current(or we
don't have any reader).
+ // TODO: Verify that for reader this is the correct approach.
+ Long lastSyncTimeInMs = currentHAGroupStoreRecord
+ .getLastSyncStateTimeInMs();
+ ClusterRole clusterRole = haGroupState.getClusterRole();
+ if (currentHAGroupStoreRecord.getHAGroupState()
+ == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
+ && haGroupState ==
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) {
+ lastSyncTimeInMs = System.currentTimeMillis();
+ } else if (haGroupState == HAGroupState.ACTIVE_IN_SYNC
+ || !(ClusterRole.ACTIVE.equals(clusterRole)
+ ||
ClusterRole.ACTIVE_TO_STANDBY.equals(clusterRole))) {
+ lastSyncTimeInMs = null;
+ }
HAGroupStoreRecord newHAGroupStoreRecord = new
HAGroupStoreRecord(
currentHAGroupStoreRecord.getProtocolVersion(),
currentHAGroupStoreRecord.getHaGroupName(),
- haGroupState
+ haGroupState,
+ lastSyncTimeInMs
);
// TODO: Check if cluster role is changing, if so, we need to
update
// the system table first
@@ -339,11 +370,11 @@ public class HAGroupStoreClient implements Closeable {
* @return HAGroupStoreRecord for the specified HA group name, or null if
not found
* @throws IOException if the client is not healthy
*/
- private HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws
IOException {
+ public HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws
IOException {
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
}
- return fetchCacheRecord(this.peerPathChildrenCache,
CACHE_TYPE_PEER).getLeft();
+ return fetchCacheRecord(this.peerPathChildrenCache,
ClusterType.PEER).getLeft();
}
private void initializeZNodeIfNeeded() throws IOException,
@@ -353,13 +384,21 @@ public class HAGroupStoreClient implements Closeable {
Pair<HAGroupStoreRecord, Stat> cacheRecordFromZK =
phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName);
HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft();
+ HAGroupState defaultHAGroupState =
this.clusterRole.getDefaultHAGroupState();
+ // Initialize lastSyncTimeInMs only if we start in ACTIVE_NOT_IN_SYNC
state
+ // and ZNode is not already present
+ Long lastSyncTimeInMs =
defaultHAGroupState.equals(HAGroupState.ACTIVE_NOT_IN_SYNC)
+ ? System.currentTimeMillis()
+ : null;
HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord(
HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
- this.clusterRole.getDefaultHAGroupState()
+ this.clusterRole.getDefaultHAGroupState(),
+ lastSyncTimeInMs
);
// Only update current ZNode if it doesn't have the same role as
present in System Table.
// If not exists, then create ZNode
+ // TODO: Discuss if this approach is what reader needs.
if (haGroupStoreRecord != null &&
!haGroupStoreRecord.getClusterRole().equals(this.clusterRole))
{
phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
@@ -431,16 +470,17 @@ public class HAGroupStoreClient implements Closeable {
try {
// Setup peer connection if needed (first time or ZK Url
changed)
if (peerPathChildrenCache == null
- || (peerPhoenixHaAdmin != null &&
- !StringUtils.equals(this.peerZKUrl,
peerPhoenixHaAdmin.getZkUrl()))) {
+ || peerPhoenixHaAdmin != null
+ && !StringUtils.equals(this.peerZKUrl,
peerPhoenixHaAdmin.getZkUrl())) {
// Clean up existing peer connection if it exists
closePeerConnection();
// Setup new peer connection
this.peerPhoenixHaAdmin
- = new PhoenixHAAdmin(this.peerZKUrl, conf,
ZK_CONSISTENT_HA_NAMESPACE);
+ = new PhoenixHAAdmin(this.peerZKUrl, conf,
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
// Create new PeerPathChildrenCache
this.peerPathChildrenCache =
initializePathChildrenCache(peerPhoenixHaAdmin,
- this.peerCustomPathChildrenCacheListener,
CACHE_TYPE_PEER);
+ this.peerCustomPathChildrenCacheListener,
ClusterType.PEER);
}
} catch (Exception e) {
closePeerConnection();
@@ -459,7 +499,8 @@ public class HAGroupStoreClient implements Closeable {
}
private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin,
- PathChildrenCacheListener customListener, String cacheType) {
+
PathChildrenCacheListener customListener,
+ ClusterType
cacheType) {
LOGGER.info("Initializing {} PathChildrenCache with URL {}",
cacheType, admin.getZkUrl());
PathChildrenCache newPathChildrenCache = null;
try {
@@ -487,30 +528,38 @@ public class HAGroupStoreClient implements Closeable {
}
}
- private PathChildrenCacheListener createCacheListener(CountDownLatch
latch, String cacheType) {
+ private PathChildrenCacheListener createCacheListener(CountDownLatch latch,
+ ClusterType
cacheType) {
return (client, event) -> {
final ChildData childData = event.getData();
- HAGroupStoreRecord eventRecord =
extractHAGroupStoreRecordOrNull(childData);
+ Pair<HAGroupStoreRecord, Stat> eventRecordAndStat
+ = extractHAGroupStoreRecordOrNull(childData);
+ HAGroupStoreRecord eventRecord = eventRecordAndStat.getLeft();
+ Stat eventStat = eventRecordAndStat.getRight();
LOGGER.info("HAGroupStoreClient Cache {} received event {} type {}
at {}",
cacheType, eventRecord, event.getType(),
System.currentTimeMillis());
switch (event.getType()) {
- // TODO: Add support for event watcher for HAGroupStoreRecord
- // case CHILD_ADDED:
- // case CHILD_UPDATED:
- // case CHILD_REMOVED:
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ if (eventRecord != null) {
+ handleStateChange(eventRecord, eventStat, cacheType);
+ }
+ break;
+ case CHILD_REMOVED:
+ break;
case INITIALIZED:
latch.countDown();
break;
case CONNECTION_LOST:
case CONNECTION_SUSPENDED:
- if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+ if (ClusterType.LOCAL.equals(cacheType)) {
isHealthy = false;
}
LOGGER.warn("{} HAGroupStoreClient cache connection
lost/suspended",
cacheType);
break;
case CONNECTION_RECONNECTED:
- if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+ if (ClusterType.LOCAL.equals(cacheType)) {
isHealthy = true;
}
LOGGER.info("{} HAGroupStoreClient cache connection
reconnected", cacheType);
@@ -524,7 +573,7 @@ public class HAGroupStoreClient implements Closeable {
private Pair<HAGroupStoreRecord, Stat> fetchCacheRecord(PathChildrenCache
cache,
- String cacheType) {
+ ClusterType
cacheType) {
if (cache == null) {
LOGGER.warn("{} HAGroupStoreClient cache is null, returning null",
cacheType);
return Pair.of(null, null);
@@ -537,7 +586,7 @@ public class HAGroupStoreClient implements Closeable {
return result;
}
- if (cacheType.equals(CACHE_TYPE_PEER)) {
+ if (cacheType.equals(ClusterType.PEER)) {
return Pair.of(null, null);
}
// If no record found, try to rebuild and fetch again
@@ -555,23 +604,25 @@ public class HAGroupStoreClient implements Closeable {
}
private Pair<HAGroupStoreRecord, Stat>
extractRecordAndStat(PathChildrenCache cache,
- String targetPath, String cacheType) {
+ String
targetPath,
+ ClusterType
cacheType) {
ChildData childData = cache.getCurrentData(targetPath);
if (childData != null) {
- HAGroupStoreRecord record =
extractHAGroupStoreRecordOrNull(childData);
- Stat currentStat = childData.getStat();
- LOGGER.info("Built {} cluster record: {}", cacheType, record);
- return Pair.of(record, currentStat);
+ Pair<HAGroupStoreRecord, Stat> recordAndStat
+ = extractHAGroupStoreRecordOrNull(childData);
+ LOGGER.info("Built {} cluster record: {}", cacheType,
recordAndStat.getLeft());
+ return recordAndStat;
}
return Pair.of(null, null);
}
- private HAGroupStoreRecord extractHAGroupStoreRecordOrNull(final ChildData
childData) {
+ private Pair<HAGroupStoreRecord, Stat> extractHAGroupStoreRecordOrNull(
+ final ChildData childData) {
if (childData != null) {
byte[] data = childData.getData();
- return HAGroupStoreRecord.fromJson(data).orElse(null);
+ return Pair.of(HAGroupStoreRecord.fromJson(data).orElse(null),
childData.getStat());
}
- return null;
+ return Pair.of(null, null);
}
@@ -638,4 +689,127 @@ public class HAGroupStoreClient implements Closeable {
return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime)
> waitTime);
}
+ // ========== HA Group State Change Subscription Methods ==========
+
+ /**
+ * Subscribe to be notified when any transition to a target state occurs.
+ *
+ * @param targetState the target state to watch for
+ * @param clusterType whether to monitor local or peer cluster
+ * @param listener the listener to notify when any transition to the
target state occurs
+ */
+ public void subscribeToTargetState(HAGroupState targetState,
+ ClusterType clusterType,
HAGroupStateListener listener) {
+ String key = buildTargetStateKey(clusterType, targetState);
+ targetStateSubscribers.computeIfAbsent(key, k -> new
CopyOnWriteArraySet<>()).add(listener);
+ LOGGER.info("Subscribed listener to target state {} for HA group {} on
{} cluster",
+ targetState, haGroupName, clusterType);
+ }
+
+ /**
+ * Unsubscribe from target state notifications.
+ *
+ * @param targetState the target state
+ * @param clusterType whether monitoring local or peer cluster
+ * @param listener the listener to remove
+ */
+ public void unsubscribeFromTargetState(HAGroupState targetState,
+ ClusterType clusterType,
HAGroupStateListener listener) {
+ String key = buildTargetStateKey(clusterType, targetState);
+ CopyOnWriteArraySet<HAGroupStateListener> listeners =
targetStateSubscribers.get(key);
+ if (listeners != null && listeners.remove(listener)) {
+ if (listeners.isEmpty()) {
+ targetStateSubscribers.remove(key);
+ }
+ LOGGER.info("Unsubscribed listener from target state {} for HA
group {} on {} cluster",
+ targetState, haGroupName, clusterType);
+ }
+ }
+
+ /**
+ * Handle state change detection and notify subscribers if a transition
occurred.
+ *
+ * @param newRecord the new HA group store record
+ * @param cacheType the type of cache (LOCAL or PEER)
+ */
+ private void handleStateChange(HAGroupStoreRecord newRecord,
+ Stat newStat, ClusterType cacheType) {
+ HAGroupState newState = newRecord.getHAGroupState();
+ HAGroupState oldState;
+ ClusterType clusterType;
+
+ if (ClusterType.LOCAL.equals(cacheType)) {
+ oldState = lastKnownLocalState;
+ lastKnownLocalState = newState;
+ clusterType = ClusterType.LOCAL;
+ } else {
+ oldState = lastKnownPeerState;
+ lastKnownPeerState = newState;
+ clusterType = ClusterType.PEER;
+ }
+
+ // Only notify if there's an actual state transition or initial state
+ if (oldState == null || !oldState.equals(newState)) {
+ LOGGER.info("Detected state transition for HA group {} from {} to
{} on {} cluster",
+ haGroupName, oldState, newState, clusterType);
+ notifySubscribers(oldState, newState, newStat.getMtime(),
clusterType);
+ }
+ }
+
+ /**
+ * Notify all relevant subscribers of a state transition.
+ *
+ * @param fromState the state transitioned from
+ * @param toState the state transitioned to
+ * @param clusterType the cluster type where the transition occurred
+ */
+ private void notifySubscribers(HAGroupState fromState,
+ HAGroupState toState,
+ long modifiedTime,
+ ClusterType clusterType) {
+ LOGGER.debug("Notifying subscribers of state transition "
+ + "for HA group {} from {} to {} on {} cluster",
+ haGroupName, fromState, toState, clusterType);
+ String targetStateKey = buildTargetStateKey(clusterType, toState);
+
+ // Collect all listeners that need to be notified
+ Set<HAGroupStateListener> listenersToNotify = new HashSet<>();
+
+ // Find target state subscribers
+ CopyOnWriteArraySet<HAGroupStateListener> targetListeners
+ = targetStateSubscribers.get(targetStateKey);
+ if (targetListeners != null) {
+ listenersToNotify.addAll(targetListeners);
+ }
+
+ // Notify all listeners with error isolation
+ if (!listenersToNotify.isEmpty()) {
+ LOGGER.info("Notifying {} listeners of state transition"
+ + "for HA group {} from {} to {} on {} cluster",
+ listenersToNotify.size(), haGroupName, fromState, toState,
clusterType);
+
+ for (HAGroupStateListener listener : listenersToNotify) {
+ try {
+ listener.onStateChange(haGroupName,
+ toState, modifiedTime, clusterType);
+ } catch (Exception e) {
+ LOGGER.error("Error notifying listener of state transition
"
+ + "for HA group {} from {} to {} on {}
cluster",
+ haGroupName, fromState, toState, clusterType, e);
+ // Continue notifying other listeners
+ }
+ }
+ }
+ }
+
+ // ========== Helper Methods ==========
+
+ /**
+ * Build key for target state subscriptions.
+ */
+ private String buildTargetStateKey(ClusterType clusterType,
+ HAGroupState targetState) {
+ return clusterType + ":" + targetState;
+ }
+
}
\ No newline at end of file
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index e04d1a2dac..7045a301f3 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -36,7 +36,8 @@ import static org.apache.phoenix.query.QueryServicesOptions
/**
* Implementation of HAGroupStoreManager that uses HAGroupStoreClient.
- * Manages all HAGroupStoreClient instances.
+ * Manages all HAGroupStoreClient instances and provides passthrough
+ * functionality for HA group state change notifications.
*/
public class HAGroupStoreManager {
private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
@@ -78,7 +79,6 @@ public class HAGroupStoreManager {
return HAGroupStoreClient.getHAGroupNames(this.zkUrl);
}
-
/**
* Checks whether mutation is blocked or not for a specific HA group.
*
@@ -87,17 +87,15 @@ public class HAGroupStoreManager {
* @return true if mutation is blocked, false otherwise.
* @throws IOException when HAGroupStoreClient is not healthy.
*/
- public boolean isMutationBlocked(String haGroupName) throws IOException,
SQLException {
+ public boolean isMutationBlocked(String haGroupName) throws IOException {
if (mutationBlockEnabled) {
- HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf,
- haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- return haGroupStoreClient.getHAGroupStoreRecord() != null
- &&
haGroupStoreClient.getHAGroupStoreRecord().getClusterRole() != null
- &&
haGroupStoreClient.getHAGroupStoreRecord().getClusterRole()
- .isMutationBlocked();
- }
- throw new IOException("HAGroupStoreClient is not initialized");
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ HAGroupStoreRecord recordWithMetadata
+ = haGroupStoreClient.getHAGroupStoreRecord();
+ return recordWithMetadata != null
+ && recordWithMetadata.getClusterRole() != null
+ && recordWithMetadata.getClusterRole()
+ .isMutationBlocked();
}
return false;
}
@@ -138,13 +136,8 @@ public class HAGroupStoreManager {
*/
public void invalidateHAGroupStoreClient(final String haGroupName,
boolean broadcastUpdate) throws Exception {
- HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf,
- haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- haGroupStoreClient.rebuild();
- } else {
- throw new IOException("HAGroupStoreClient is not initialized");
- }
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ haGroupStoreClient.rebuild();
}
/**
@@ -156,13 +149,23 @@ public class HAGroupStoreManager {
* @throws IOException when HAGroupStoreClient is not healthy.
*/
public Optional<HAGroupStoreRecord> getHAGroupStoreRecord(final String
haGroupName)
- throws IOException, SQLException {
- HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf,
- haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- return
Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
- }
- throw new IOException("HAGroupStoreClient is not initialized");
+ throws IOException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
+ }
+
+ /**
+ * Returns the HAGroupStoreRecord for a specific HA group from peer
cluster.
+ *
+ * @param haGroupName name of the HA group
+ * @return Optional HAGroupStoreRecord for the HA group from peer cluster
can be empty if
+ * the HA group is not found or peer cluster is not available.
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ */
+ public Optional<HAGroupStoreRecord> getPeerHAGroupStoreRecord(final String
haGroupName)
+ throws IOException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ return
Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecordFromPeer());
}
/**
@@ -173,15 +176,10 @@ public class HAGroupStoreManager {
*/
public void setHAGroupStatusToStoreAndForward(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
- InvalidClusterRoleTransitionException, SQLException {
- HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf,
- haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- haGroupStoreClient.setHAGroupStatusIfNeeded(
- HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
- } else {
- throw new IOException("HAGroupStoreClient is not initialized");
- }
+ InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ haGroupStoreClient.setHAGroupStatusIfNeeded(
+ HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
}
/**
@@ -190,19 +188,78 @@ public class HAGroupStoreManager {
* @param haGroupName name of the HA group
* @throws IOException when HAGroupStoreClient is not healthy.
*/
- public void setHAGroupStatusRecordToSync(final String haGroupName)
+ public void setHAGroupStatusToSync(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
- InvalidClusterRoleTransitionException, SQLException {
- HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf,
- haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- haGroupStoreClient.setHAGroupStatusIfNeeded(
- HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
- } else {
- throw new IOException("HAGroupStoreClient is not initialized");
+ InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ haGroupStoreClient.setHAGroupStatusIfNeeded(
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ }
+
+ /**
+ * Sets the HAGroupStoreRecord to degrade reader functionality in local
cluster.
+ * Transitions from STANDBY to DEGRADED_STANDBY_FOR_READER or from
+ * DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY.
+ *
+ * @param haGroupName name of the HA group
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ * @throws InvalidClusterRoleTransitionException when the current state
+ * cannot transition to a degraded reader state
+ */
+ public void setReaderToDegraded(final String haGroupName)
+ throws IOException, StaleHAGroupStoreRecordVersionException,
+ InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ HAGroupStoreRecord currentRecord
+ = haGroupStoreClient.getHAGroupStoreRecord();
+
+ if (currentRecord == null) {
+ throw new IOException("Current HAGroupStoreRecord is null for HA
group: "
+ + haGroupName);
+ }
+
+ HAGroupStoreRecord.HAGroupState currentState =
currentRecord.getHAGroupState();
+ HAGroupStoreRecord.HAGroupState targetState
+ = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER;
+
+ if (currentState ==
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER) {
+ targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY;
}
+
+ haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
}
+ /**
+ * Sets the HAGroupStoreRecord to restore reader functionality in local
cluster.
+ * Transitions from DEGRADED_STANDBY_FOR_READER to STANDBY or from
+ * DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER.
+ *
+ * @param haGroupName name of the HA group
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ * @throws InvalidClusterRoleTransitionException when the current state
+ * cannot transition to a healthy reader state
+ */
+ public void setReaderToHealthy(final String haGroupName)
+ throws IOException, StaleHAGroupStoreRecordVersionException,
+ InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ HAGroupStoreRecord currentRecord
+ = haGroupStoreClient.getHAGroupStoreRecord();
+
+ if (currentRecord == null) {
+ throw new IOException("Current HAGroupStoreRecord is null "
+ + "for HA group: " + haGroupName);
+ }
+
+ HAGroupStoreRecord.HAGroupState currentState =
currentRecord.getHAGroupState();
+ HAGroupStoreRecord.HAGroupState targetState =
HAGroupStoreRecord.HAGroupState.STANDBY;
+
+ if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY) {
+ targetState =
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER;
+ }
+
+ haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
+ }
/**
* Returns the ClusterRoleRecord for the cluster pair.
@@ -214,12 +271,70 @@ public class HAGroupStoreManager {
* @throws IOException when HAGroupStoreClient is not healthy.
*/
public ClusterRoleRecord getClusterRoleRecord(String haGroupName)
- throws IOException, SQLException {
+ throws IOException {
+ HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
+ return haGroupStoreClient.getClusterRoleRecord();
+ }
+
+ /**
+ * Subscribe to be notified when any transition to a target state occurs.
+ *
+ * @param haGroupName the name of the HA group to monitor
+ * @param targetState the target state to watch for
+ * @param clusterType whether to monitor local or peer cluster
+ * @param listener the listener to notify when any transition to the
target state occurs
+ * @throws IOException if unable to get HAGroupStoreClient instance
+ */
+ public void subscribeToTargetState(String haGroupName,
+ HAGroupStoreRecord.HAGroupState
targetState,
+ ClusterType clusterType,
+ HAGroupStateListener listener) throws
IOException {
+ HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+ client.subscribeToTargetState(targetState, clusterType, listener);
+ LOGGER.debug("Delegated subscription to target state {} "
+ + "for HA group {} on {} cluster to client",
+ targetState, haGroupName, clusterType);
+ }
+
+ /**
+ * Unsubscribe from target state notifications.
+ *
+ * @param haGroupName the name of the HA group
+ * @param targetState the target state
+ * @param clusterType whether monitoring local or peer cluster
+ * @param listener the listener to remove
+ */
+ public void unsubscribeFromTargetState(String haGroupName,
+ HAGroupStoreRecord.HAGroupState
targetState,
+ ClusterType clusterType,
+ HAGroupStateListener listener) {
+ try {
+ HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+ client.unsubscribeFromTargetState(targetState, clusterType,
listener);
+ LOGGER.debug("Delegated unsubscription from target state {} "
+ + "for HA group {} on {} cluster to client",
+ targetState, haGroupName, clusterType);
+ } catch (IOException e) {
+ LOGGER.warn("HAGroupStoreClient not found for HA group: {} -
cannot unsubscribe: {}",
+ haGroupName, e.getMessage());
+ }
+ }
+
+ /**
+ * Helper method to get HAGroupStoreClient instance with consistent error
handling.
+ *
+ * @param haGroupName name of the HA group
+ * @return HAGroupStoreClient instance for the specified HA group
+ * @throws IOException when HAGroupStoreClient is not initialized
+ */
+ private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName)
+ throws IOException {
HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf,
haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- return haGroupStoreClient.getClusterRoleRecord();
+ if (haGroupStoreClient == null) {
+ throw new IOException("HAGroupStoreClient is not initialized "
+ + "for HA group: " + haGroupName);
}
- throw new IOException("HAGroupStoreClient is not initialized");
+ return haGroupStoreClient;
}
}
\ No newline at end of file
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
index aa49a22eb2..6891d93c46 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
@@ -118,9 +118,11 @@ public class HAGroupStoreRecord {
OFFLINE.allowedTransitions = ImmutableSet.of();
// This needs to be manually recovered by operator
UNKNOWN.allowedTransitions = ImmutableSet.of();
- ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions =
ImmutableSet.of(ABORT_TO_ACTIVE_NOT_IN_SYNC,
+ ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions
+ = ImmutableSet.of(ABORT_TO_ACTIVE_NOT_IN_SYNC,
ACTIVE_IN_SYNC_TO_STANDBY);
- ACTIVE_IN_SYNC_TO_STANDBY.allowedTransitions =
ImmutableSet.of(ABORT_TO_ACTIVE_IN_SYNC, STANDBY);
+ ACTIVE_IN_SYNC_TO_STANDBY.allowedTransitions
+ = ImmutableSet.of(ABORT_TO_ACTIVE_IN_SYNC, STANDBY);
STANDBY_TO_ACTIVE.allowedTransitions =
ImmutableSet.of(ABORT_TO_STANDBY,
ACTIVE_IN_SYNC);
DEGRADED_STANDBY.allowedTransitions
@@ -161,17 +163,28 @@ public class HAGroupStoreRecord {
private final String protocolVersion;
private final String haGroupName;
private final HAGroupState haGroupState;
+ private final Long lastSyncStateTimeInMs;
@JsonCreator
public HAGroupStoreRecord(@JsonProperty("protocolVersion") String
protocolVersion,
@JsonProperty("haGroupName") String haGroupName,
- @JsonProperty("haGroupState") HAGroupState
haGroupState) {
+ @JsonProperty("haGroupState") HAGroupState
haGroupState,
+ @JsonProperty("lastSyncStateTimeInMs") Long
lastSyncStateTimeInMs) {
Preconditions.checkNotNull(haGroupName, "HA group name cannot be
null!");
Preconditions.checkNotNull(haGroupState, "HA group state cannot be
null!");
this.protocolVersion = Objects.toString(protocolVersion,
DEFAULT_PROTOCOL_VERSION);
this.haGroupName = haGroupName;
this.haGroupState = haGroupState;
+ this.lastSyncStateTimeInMs = lastSyncStateTimeInMs;
+ }
+
+ /**
+ * Convenience constructor for backward compatibility without
lastSyncStateTimeInMs.
+ */
+ public HAGroupStoreRecord(String protocolVersion,
+ String haGroupName, HAGroupState haGroupState) {
+ this(protocolVersion, haGroupName, haGroupState, null);
}
public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) {
@@ -193,9 +206,10 @@ public class HAGroupStoreRecord {
}
public boolean hasSameInfo(HAGroupStoreRecord other) {
- return haGroupName.equals(other.haGroupName) &&
- haGroupState.equals(other.haGroupState) &&
- protocolVersion.equals(other.protocolVersion);
+ return haGroupName.equals(other.haGroupName)
+ && haGroupState.equals(other.haGroupState)
+ && protocolVersion.equals(other.protocolVersion)
+ && Objects.equals(lastSyncStateTimeInMs,
other.lastSyncStateTimeInMs);
}
public String getProtocolVersion() {
@@ -211,6 +225,10 @@ public class HAGroupStoreRecord {
return haGroupState;
}
+ public Long getLastSyncStateTimeInMs() {
+ return lastSyncStateTimeInMs;
+ }
+
@JsonIgnore
public ClusterRoleRecord.ClusterRole getClusterRole() {
return haGroupState.getClusterRole();
@@ -222,6 +240,7 @@ public class HAGroupStoreRecord {
.append(protocolVersion)
.append(haGroupName)
.append(haGroupState)
+ .append(lastSyncStateTimeInMs)
.hashCode();
}
@@ -239,6 +258,7 @@ public class HAGroupStoreRecord {
.append(protocolVersion, otherRecord.protocolVersion)
.append(haGroupName, otherRecord.haGroupName)
.append(haGroupState, otherRecord.haGroupState)
+ .append(lastSyncStateTimeInMs,
otherRecord.lastSyncStateTimeInMs)
.isEquals();
}
}
@@ -249,6 +269,7 @@ public class HAGroupStoreRecord {
+ "protocolVersion='" + protocolVersion + '\''
+ ", haGroupName='" + haGroupName + '\''
+ ", haGroupState=" + haGroupState
+ + ", lastSyncStateTimeInMs=" + lastSyncStateTimeInMs
+ '}';
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
index 9aa6173fd1..e66da0829b 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
@@ -43,7 +43,7 @@ import org.junit.rules.TestName;
import java.util.Map;
-import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -83,7 +83,7 @@ public class
PhoenixRegionServerEndpointITWithConsistentFailover extends BaseTes
assertNotNull(coprocessor);
ServerRpcController controller = new ServerRpcController();
- try (PhoenixHAAdmin peerHAAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_NAMESPACE)) {
+ try (PhoenixHAAdmin peerHAAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE)) {
HAGroupStoreRecord peerHAGroupStoreRecord = new
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
HAGroupState.STANDBY);
peerHAAdmin.createHAGroupStoreRecordInZooKeeper(peerHAGroupStoreRecord);
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
index fdf64dd2df..377bdf69d7 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.end2end.index;
-import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
@@ -75,7 +75,7 @@ public class IndexRegionObserverMutationBlockingIT extends
BaseTest {
@Before
public void setUp() throws Exception {
- haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+ haAdmin = new PhoenixHAAdmin(config,
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
String zkUrl = getLocalZkUrl(config);
String peerZkUrl = CLUSTERS.getZkUrl2();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
new file mode 100644
index 0000000000..32239e3887
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for HA Group State Change subscription functionality.
+ * Tests the new subscription system where HAGroupStoreClient directly manages
+ * subscriptions and HAGroupStoreManager acts as a passthrough.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStateSubscriptionIT extends BaseTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HAGroupStateSubscriptionIT.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private PhoenixHAAdmin haAdmin;
+ private PhoenixHAAdmin peerHaAdmin;
+ private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 2000L;
+ private String zkUrl;
+ private String peerZKUrl;
+ private static final
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ CLUSTERS.start();
+ }
+
+ @Before
+ public void before() throws Exception {
+ haAdmin = new PhoenixHAAdmin(config,
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+ zkUrl = getLocalZkUrl(config);
+ this.peerZKUrl = CLUSTERS.getZkUrl2();
+ peerHaAdmin = new PhoenixHAAdmin(peerZKUrl, config,
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+ // Clean up existing HAGroupStoreRecords
+ try {
+ List<String> haGroupNames =
HAGroupStoreClient.getHAGroupNames(zkUrl);
+ for (String haGroupName : haGroupNames) {
+
haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+ }
+
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ // Remove any existing entries in the system table
+ HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl);
+
+ // Insert a HAGroupStoreRecord into the system table
+
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
zkUrl, peerZKUrl,
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY, null);
+ }
+
+ // ========== Multi-Cluster & Basic Subscription Tests ==========
+
+ @Test
+ public void testDifferentTargetStatesPerCluster() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger localNotifications = new AtomicInteger(0);
+ AtomicInteger peerNotifications = new AtomicInteger(0);
+ AtomicReference<ClusterType> lastLocalClusterType = new
AtomicReference<>();
+ AtomicReference<ClusterType> lastPeerClusterType = new
AtomicReference<>();
+
+ // Create listeners for different target states
+ HAGroupStateListener localListener = (groupName, toState,
modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType ==
ClusterType.LOCAL) {
+ localNotifications.incrementAndGet();
+ lastLocalClusterType.set(clusterType);
+ LOGGER.info("Local target state listener called: {} on {}",
toState, clusterType);
+ }
+ };
+
+ HAGroupStateListener peerListener = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_NOT_IN_SYNC && clusterType ==
ClusterType.PEER) {
+ peerNotifications.incrementAndGet();
+ lastPeerClusterType.set(clusterType);
+ LOGGER.info("Peer target state listener called: {} on {}",
toState, clusterType);
+ }
+ };
+
+ // Subscribe to different target states on different clusters
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.LOCAL, localListener);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER, peerListener);
+
+ // Trigger transition to STANDBY_TO_ACTIVE on LOCAL cluster
+ HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord,
0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Trigger transition to STANDBY on PEER cluster
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify no cross-cluster triggering
+ assertEquals("Local cluster should receive its target state
notification", 1, localNotifications.get());
+ assertEquals("Peer cluster should receive its target state
notification", 1, peerNotifications.get());
+ assertEquals("Local notification should have LOCAL cluster type",
ClusterType.LOCAL, lastLocalClusterType.get());
+ assertEquals("Peer notification should have PEER cluster type",
ClusterType.PEER, lastPeerClusterType.get());
+
+ }
+
+ @Test
+ public void testUnsubscribeSpecificCluster() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger totalNotifications = new AtomicInteger(0);
+ AtomicReference<ClusterType> lastClusterType = new AtomicReference<>();
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.STANDBY) {
+ totalNotifications.incrementAndGet();
+ lastClusterType.set(clusterType);
+ LOGGER.info("Listener called: {} on {}", toState, clusterType);
+ }
+ };
+
+ // Subscribe to same target state on both clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.LOCAL, listener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.PEER, listener);
+
+ // Unsubscribe from LOCAL cluster only
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.LOCAL, listener);
+
+ // Trigger transition to STANDBY on LOCAL → should NOT call listener
+ HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord,
0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ assertEquals("Should receive no notifications from LOCAL cluster", 0,
totalNotifications.get());
+
+ // Trigger transition to STANDBY on PEER → should call listener
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ assertEquals("Should receive notification only from PEER cluster", 1,
totalNotifications.get());
+ assertEquals("Notification should be from PEER cluster",
ClusterType.PEER, lastClusterType.get());
+
+ }
+
+ // ========== Multiple Listeners Tests ==========
+
+ @Test
+ public void testMultipleListenersMultipleClusters() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications from multiple listeners
+ AtomicInteger listener1LocalNotifications = new AtomicInteger(0);
+ AtomicInteger listener2LocalNotifications = new AtomicInteger(0);
+ AtomicInteger listener1PeerNotifications = new AtomicInteger(0);
+ AtomicInteger listener2PeerNotifications = new AtomicInteger(0);
+
+ HAGroupStateListener listener1 = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.DEGRADED_STANDBY) {
+ if (clusterType == ClusterType.LOCAL) {
+ listener1LocalNotifications.incrementAndGet();
+ } else {
+ listener1PeerNotifications.incrementAndGet();
+ }
+ LOGGER.info("Listener1 called: {} on {}", toState,
clusterType);
+ }
+ };
+
+ HAGroupStateListener listener2 = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.DEGRADED_STANDBY) {
+ if (clusterType == ClusterType.LOCAL) {
+ listener2LocalNotifications.incrementAndGet();
+ } else {
+ listener2PeerNotifications.incrementAndGet();
+ }
+ LOGGER.info("Listener2 called: {} on {}", toState,
clusterType);
+ }
+ };
+
+ // Register multiple listeners for same target state on both clusters
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL, listener1);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL, listener2);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.DEGRADED_STANDBY, ClusterType.PEER, listener1);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.DEGRADED_STANDBY, ClusterType.PEER, listener2);
+
+ // Trigger transition to DEGRADED_STANDBY on LOCAL
+ HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.DEGRADED_STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord,
0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Trigger transition to DEGRADED_STANDBY on PEER
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.DEGRADED_STANDBY);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify all listeners called for each cluster
+ assertEquals("Listener1 should receive LOCAL notification", 1,
listener1LocalNotifications.get());
+ assertEquals("Listener2 should receive LOCAL notification", 1,
listener2LocalNotifications.get());
+ assertEquals("Listener1 should receive PEER notification", 1,
listener1PeerNotifications.get());
+ assertEquals("Listener2 should receive PEER notification", 1,
listener2PeerNotifications.get());
+
+ }
+
+ @Test
+ public void testSameListenerDifferentTargetStates() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track which target states were reached
+ AtomicInteger stateANotifications = new AtomicInteger(0);
+ AtomicInteger stateBNotifications = new AtomicInteger(0);
+ AtomicReference<ClusterType> lastStateAClusterType = new
AtomicReference<>();
+ AtomicReference<ClusterType> lastStateBClusterType = new
AtomicReference<>();
+
+ HAGroupStateListener sharedListener = (groupName, toState,
modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY &&
clusterType == ClusterType.LOCAL) {
+ stateANotifications.incrementAndGet();
+ lastStateAClusterType.set(clusterType);
+ LOGGER.info("Shared listener - Target State A: {} on {}",
toState, clusterType);
+ } else if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType
== ClusterType.PEER) {
+ stateBNotifications.incrementAndGet();
+ lastStateBClusterType.set(clusterType);
+ LOGGER.info("Shared listener - Target State B: {} on {}",
toState, clusterType);
+ }
+ };
+
+ // Register same listener for different target states on different
clusters
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, ClusterType.LOCAL, sharedListener);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.PEER, sharedListener);
+
+ // Trigger target state A on LOCAL
+ HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord,
0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Trigger target state B on PEER
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify listener called for each appropriate target state/cluster
combination
+ assertEquals("Should receive target state A notification", 1,
stateANotifications.get());
+ assertEquals("Should receive target state B notification", 1,
stateBNotifications.get());
+ assertEquals("Target state A should be from LOCAL cluster",
ClusterType.LOCAL, lastStateAClusterType.get());
+ assertEquals("Target state B should be from PEER cluster",
ClusterType.PEER, lastStateBClusterType.get());
+
+ }
+
+ // ========== Edge Cases & Error Handling ==========
+
+ @Test
+ public void testSubscriptionToNonExistentHAGroup() throws Exception {
+ String nonExistentHAGroup = "nonExistentGroup_" +
testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime,
clusterType) -> {
+ // Should not be called
+ };
+
+ // Try to subscribe to non-existent HA group
+ try {
+ manager.subscribeToTargetState(nonExistentHAGroup,
HAGroupState.STANDBY, ClusterType.LOCAL, listener);
+ fail("Expected IOException for non-existent HA group");
+ } catch (IOException e) {
+ assertTrue("Exception should mention the HA group name",
e.getMessage().contains(nonExistentHAGroup));
+ LOGGER.info("Correctly caught exception for non-existent HA group:
{}", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testListenerExceptionIsolation() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger goodListener1Notifications = new AtomicInteger(0);
+ AtomicInteger goodListener2Notifications = new AtomicInteger(0);
+ AtomicInteger badListenerCalls = new AtomicInteger(0);
+
+ HAGroupStateListener goodListener1 = (groupName, toState,
modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+ goodListener1Notifications.incrementAndGet();
+ LOGGER.info("Good listener 1 called: {} on {}", toState,
clusterType);
+ }
+ };
+
+ HAGroupStateListener badListener = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+ badListenerCalls.incrementAndGet();
+ LOGGER.info("Bad listener called, about to throw exception");
+ throw new RuntimeException("Test exception from bad listener");
+ }
+ };
+
+ HAGroupStateListener goodListener2 = (groupName, toState,
modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+ goodListener2Notifications.incrementAndGet();
+ LOGGER.info("Good listener 2 called: {} on {}", toState,
clusterType);
+ }
+ };
+
+ // Register listeners - bad listener in the middle
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, goodListener1);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, badListener);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, goodListener2);
+
+ // Trigger transition to target state
+ HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
transitionRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify all listeners were called despite exception in bad listener
+ assertEquals("Good listener 1 should be called", 1,
goodListener1Notifications.get());
+ assertEquals("Good listener 2 should be called", 1,
goodListener2Notifications.get());
+ assertEquals("Bad listener should be called", 1,
badListenerCalls.get());
+ }
+
+ // ========== Performance & Concurrency Tests ==========
+
+ @Test
+ public void testConcurrentMultiClusterSubscriptions() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ final int threadCount = 10;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch completionLatch = new CountDownLatch(threadCount);
+ final AtomicInteger successfulSubscriptions = new AtomicInteger(0);
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ // Create concurrent subscription tasks
+ for (int i = 0; i < threadCount; i++) {
+ final int threadIndex = i;
+ executor.submit(() -> {
+ try {
+ startLatch.await(); // Wait for all threads to be ready
+
+ HAGroupStateListener listener = (groupName, toState,
modifiedTime, clusterType) -> {
+ LOGGER.debug("Thread {} listener called: {} on {}",
threadIndex, toState, clusterType);
+ };
+
+ // Half subscribe to LOCAL, half to PEER
+ ClusterType clusterType = (threadIndex % 2 == 0) ?
ClusterType.LOCAL : ClusterType.PEER;
+
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, clusterType, listener);
+ successfulSubscriptions.incrementAndGet();
+
+ // Also test unsubscribe
+ manager.unsubscribeFromTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, clusterType, listener);
+
+ } catch (Exception e) {
+ LOGGER.error("Thread {} failed", threadIndex, e);
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue("All threads should complete within timeout",
completionLatch.await(30, TimeUnit.SECONDS));
+ assertEquals("All threads should successfully subscribe", threadCount,
successfulSubscriptions.get());
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testHighFrequencyMultiClusterChanges() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger localNotifications = new AtomicInteger(0);
+ AtomicInteger peerNotifications = new AtomicInteger(0);
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (clusterType == ClusterType.LOCAL) {
+ localNotifications.incrementAndGet();
+ } else {
+ peerNotifications.incrementAndGet();
+ }
+ LOGGER.debug("High frequency listener: {} on {}", toState,
clusterType);
+ };
+
+ // Subscribe to target state on both clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.LOCAL, listener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.PEER, listener);
+
+ // Rapidly alternate state changes on both clusters
+ final int changeCount = 5;
+ HAGroupStoreRecord initialPeerRecord = new HAGroupStoreRecord("1.0",
haGroupName,HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(initialPeerRecord);
+
+ for (int i = 0; i < changeCount; i++) {
+ // Change local cluster
+ HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0",
haGroupName,
+ (i % 2 == 0) ? HAGroupState.STANDBY :
HAGroupState.DEGRADED_STANDBY_FOR_READER);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
localRecord, -1);
+
+ // Change peer cluster
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0",
haGroupName,
+ (i % 2 == 0) ? HAGroupState.STANDBY :
HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
peerRecord, -1);
+
+ // Small delay between changes
+ Thread.sleep(500);
+ }
+
+ // Final wait for all events to propagate
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify transitions detected on both clusters
+ // Expected: 3 transitions to STANDBY state (i=0,2,4 → STANDBY)
+ assertEquals("Should detect exactly 3 local cluster transitions to
STANDBY", 3, localNotifications.get());
+ assertEquals("Should detect exactly 3 peer cluster transitions to
STANDBY", 3, peerNotifications.get());
+
+ LOGGER.info("Detected {} local and {} peer notifications",
localNotifications.get(), peerNotifications.get());
+
+ }
+
+ // ========== Cleanup & Resource Management Tests ==========
+
+ @Test
+ public void testSubscriptionCleanupPerCluster() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications to verify functionality
+ AtomicInteger localActiveNotifications = new AtomicInteger(0);
+ AtomicInteger peerActiveNotifications = new AtomicInteger(0);
+ AtomicInteger localStandbyNotifications = new AtomicInteger(0);
+ AtomicInteger peerStandbyNotifications = new AtomicInteger(0);
+
+ // Create listeners that track which ones are called
+ HAGroupStateListener listener1 = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType ==
ClusterType.LOCAL) {
+ localActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener1 LOCAL ACTIVE_IN_SYNC: {}", toState);
+ }
+ };
+
+ HAGroupStateListener listener2 = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType ==
ClusterType.LOCAL) {
+ localActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener2 LOCAL ACTIVE_IN_SYNC: {}", toState);
+ } else if (toState == HAGroupState.STANDBY_TO_ACTIVE &&
clusterType == ClusterType.PEER) {
+ peerActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener2 PEER STANDBY_TO_ACTIVE: {}", toState);
+ }
+ };
+
+ HAGroupStateListener listener3 = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType ==
ClusterType.PEER) {
+ peerActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener3 PEER STANDBY_TO_ACTIVE: {}", toState);
+ }
+ };
+
+ HAGroupStateListener listener4 = (groupName, toState, modifiedTime,
clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType ==
ClusterType.LOCAL) {
+ localStandbyNotifications.incrementAndGet();
+ LOGGER.info("Listener4 LOCAL ACTIVE_IN_SYNC: {}", toState);
+ } else if (toState == HAGroupState.STANDBY_TO_ACTIVE &&
clusterType == ClusterType.PEER) {
+ peerStandbyNotifications.incrementAndGet();
+ LOGGER.info("Listener4 PEER STANDBY_TO_ACTIVE: {}", toState);
+ }
+ };
+
+ // Subscribe listeners to both clusters for target states
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener1);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener2);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener2);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener3);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener4);
+ manager.subscribeToTargetState(haGroupName,
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener4);
+
+ // Test initial functionality - trigger ACTIVE_IN_SYNC on LOCAL
+ HAGroupStoreRecord localActiveRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
localActiveRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have 2 notifications for LOCAL ACTIVE_NOT_IN_SYNC (listener1
+ listener2)
+ assertEquals("Should have 2 LOCAL ACTIVE_IN_SYNC notifications
initially", 2, localActiveNotifications.get());
+
+ // Test initial functionality - trigger STANDBY_TO_ACTIVE on PEER
+ HAGroupStoreRecord peerActiveRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerActiveRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have 2 notifications for PEER STANDBY_TO_ACTIVE (listener2 +
listener3)
+ assertEquals("Should have 2 PEER STANDBY_TO_ACTIVE notifications
initially", 2, peerActiveNotifications.get());
+
+ // Reset counters for cleanup testing
+ localActiveNotifications.set(0);
+ peerActiveNotifications.set(0);
+
+ // Unsubscribe selectively
+ manager.unsubscribeFromTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener1);
+ manager.unsubscribeFromTargetState(haGroupName,
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener2);
+ manager.unsubscribeFromTargetState(haGroupName,
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener4);
+
+ // Test after partial unsubscribe - trigger ACTIVE_IN_SYNC on LOCAL
again by first changing to some other state.
+ HAGroupStoreRecord localActiveRecord2 = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
localActiveRecord2, 1);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ HAGroupStoreRecord localActiveRecord3 = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
localActiveRecord3, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have only 1 notification for LOCAL ACTIVE_IN_SYNC (only
listener2 remains)
+ assertEquals("Should have 1 LOCAL ACTIVE_IN_SYNC notification after
partial unsubscribe", 1, localActiveNotifications.get());
+
+ // Test after partial unsubscribe - trigger STANDBY_TO_ACTIVE on PEER
again
+ HAGroupStoreRecord peerActiveRecord2 = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
peerActiveRecord2, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ HAGroupStoreRecord peerActiveRecord3 = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
peerActiveRecord3, 1);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have only 1 notification for PEER STANDBY_TO_ACTIVE (only
listener3 remains)
+ assertEquals("Should have 1 PEER STANDBY_TO_ACTIVE notification after
partial unsubscribe", 1, peerActiveNotifications.get());
+
+ // Reset counters again
+ localActiveNotifications.set(0);
+ peerActiveNotifications.set(0);
+
+ // Unsubscribe all remaining listeners
+ manager.unsubscribeFromTargetState(haGroupName,
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.LOCAL, listener2);
+ manager.unsubscribeFromTargetState(haGroupName,
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER, listener3);
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.PEER, listener4);
+
+ // Test after complete unsubscribe - trigger ACTIVE_NOT_IN_SYNC on
both clusters
+ HAGroupStoreRecord localActiveRecord4 = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
localActiveRecord4, 3);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ HAGroupStoreRecord peerActiveRecord4 = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
peerActiveRecord4, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have no notifications after complete unsubscribe
+ assertEquals("Should have 0 LOCAL ACTIVE_NOT_IN_SYNC notifications
after complete unsubscribe", 0, localActiveNotifications.get());
+ assertEquals("Should have 0 PEER ACTIVE_NOT_IN_SYNC notifications
after complete unsubscribe", 0, peerActiveNotifications.get());
+
+ // Test that new subscriptions still work properly
+ AtomicInteger newSubscriptionNotifications = new AtomicInteger(0);
+ HAGroupStateListener newTestListener = (groupName, toState,
modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.STANDBY && clusterType ==
ClusterType.LOCAL) {
+ newSubscriptionNotifications.incrementAndGet();
+ LOGGER.info("New subscription triggered: {} on {} at {}",
toState, clusterType, modifiedTime);
+ }
+ };
+
+ // Subscribe with new test listener
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY,
ClusterType.LOCAL, newTestListener);
+
+ // Trigger STANDBY state and verify new subscription works
+ HAGroupStoreRecord standbyRecord = new HAGroupStoreRecord("1.0",
haGroupName, HAGroupState.STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
standbyRecord, 4);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Expected: exactly 1 notification for the new subscription
+ assertEquals("New subscription should receive exactly 1 notification",
1, newSubscriptionNotifications.get());
+
+ LOGGER.info("Subscription cleanup test completed successfully with {}
notifications from new subscription",
+ newSubscriptionNotifications.get());
+
+ }
+
+ /**
+ * Helper method to access private subscription maps via reflection
+ */
+ @SuppressWarnings("unchecked")
+ private ConcurrentHashMap<String,
CopyOnWriteArraySet<HAGroupStateListener>>
getSubscriptionMap(HAGroupStoreClient client, String fieldName) throws
Exception {
+ Field field = HAGroupStoreClient.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (ConcurrentHashMap<String,
CopyOnWriteArraySet<HAGroupStateListener>>) field.get(client);
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index 5a062b3305..9e8087a57f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -40,6 +40,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -52,7 +53,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
-import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
@@ -91,8 +92,8 @@ public class HAGroupStoreClientIT extends BaseTest {
@Before
public void before() throws Exception {
- haAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
ZK_CONSISTENT_HA_NAMESPACE);
- peerHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_NAMESPACE);
+ haAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+ peerHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration());
@@ -429,6 +430,8 @@ public class HAGroupStoreClientIT extends BaseTest {
// The record should be automatically rebuilt from System Table as it
is not in ZK
assertNotNull(currentRecord);
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
currentRecord.getHAGroupState());
+ // The record should have a timestamp
+ assertNotNull(currentRecord.getLastSyncStateTimeInMs());
record1 = new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index 34d143e5e6..c9e17294c5 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -17,13 +17,16 @@
*/
package org.apache.phoenix.jdbc;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -31,18 +34,25 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Integration tests for {@link HAGroupStoreManager}.
@@ -63,13 +73,14 @@ public class HAGroupStoreManagerIT extends BaseTest {
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+ props.put(ZK_SESSION_TIMEOUT, String.valueOf(30*1000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
CLUSTERS.start();
}
@Before
public void before() throws Exception {
- haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+ haAdmin = new PhoenixHAAdmin(config,
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
zkUrl = getLocalZkUrl(config);
this.peerZKUrl = CLUSTERS.getZkUrl2();
@@ -147,7 +158,6 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Should be present
assertTrue(recordOpt.isPresent());
-
// Delete record from System Table
HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName,
zkUrl);
// Delete record from ZK
@@ -166,12 +176,90 @@ public class HAGroupStoreManagerIT extends BaseTest {
Optional<HAGroupStoreRecord> retrievedOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
assertTrue(retrievedOpt.isPresent());
- // Record for comparison
- HAGroupStoreRecord record = new HAGroupStoreRecord(
- HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+ // Get MTime from HAAdmin for equality verification below.
+ Pair<HAGroupStoreRecord, Stat> currentRecordAndStat =
haAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName);
+
+ // Complete object comparison field-by-field
+ assertEquals(haGroupName, retrievedOpt.get().getHaGroupName());
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
retrievedOpt.get().getHAGroupState());
+ Long lastSyncStateTimeInMs =
retrievedOpt.get().getLastSyncStateTimeInMs();
+ Long mtime = currentRecordAndStat.getRight().getMtime();
+ // Allow a small margin of error
+ assertTrue(Math.abs(lastSyncStateTimeInMs - mtime) <= 1);
+ assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
retrievedOpt.get().getProtocolVersion());
+ }
+
+ @Test
+ public void testGetPeerHAGroupStoreRecord() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+
+ // Initially, peer record should not be present
+ Optional<HAGroupStoreRecord> peerRecordOpt =
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertFalse(peerRecordOpt.isPresent());
+
+ // Create a peer HAAdmin to create records in peer cluster
+ PhoenixHAAdmin peerHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+ try {
+ // Create a HAGroupStoreRecord in the peer cluster
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.STANDBY);
+
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Now peer record should be present
+ peerRecordOpt =
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertTrue(peerRecordOpt.isPresent());
+
+ // Verify the peer record details
+ HAGroupStoreRecord retrievedPeerRecord = peerRecordOpt.get();
+ assertEquals(haGroupName, retrievedPeerRecord.getHaGroupName());
+ assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY,
retrievedPeerRecord.getHAGroupState());
+ assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
retrievedPeerRecord.getProtocolVersion());
+
+ // Delete peer record
+ peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Peer record should no longer be present
+ peerRecordOpt =
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertFalse(peerRecordOpt.isPresent());
+
+ // Create peer record again with different state
+ HAGroupStoreRecord newPeerRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(newPeerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the updated peer record
+ peerRecordOpt =
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertTrue(peerRecordOpt.isPresent());
+
assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
+ peerRecordOpt.get().getHAGroupState());
+
+ } finally {
+ // Clean up peer record
+ try {
+ peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ peerHaAdmin.close();
+ }
+ }
- // Complete object comparison instead of field-by-field
- assertEquals(record, retrievedOpt.get());
+ @Test
+ public void testGetPeerHAGroupStoreRecordWhenHAGroupNotInSystemTable()
throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+
+ // Try to get peer record for an HA group that doesn't exist in system
table
+ Optional<HAGroupStoreRecord> peerRecordOpt =
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertFalse("Peer record should not be present for non-existent HA
group", peerRecordOpt.isPresent());
}
@Test
@@ -214,9 +302,13 @@ public class HAGroupStoreManagerIT extends BaseTest {
conf.set(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "false");
conf.set(HConstants.ZOOKEEPER_QUORUM, getLocalZkUrl(config));
- HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+ // Set the HAGroupStoreManager instance to null via reflection to
force recreation
+ Field field =
HAGroupStoreManager.class.getDeclaredField("haGroupStoreManagerInstance");
+ field.setAccessible(true);
+ field.set(null, null);
- // Create HAGroupStoreRecord with ACTIVE_TO_STANDBY role
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+ // Create HAGroupStoreRecord with ACTIVE_IN_SYNC_TO_STANDBY role
HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord(
"1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
@@ -225,6 +317,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Mutations should not be blocked even with ACTIVE_TO_STANDBY role
assertFalse(haGroupStoreManager.isMutationBlocked(haGroupName));
+
+ // Set the HAGroupStoreManager instance back to null via reflection to
force recreation for other tests
+ field.set(null, null);
}
@Test
@@ -248,29 +343,43 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue(updatedRecordOpt.isPresent());
HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
updatedRecord.getHAGroupState());
+ assertNotNull(updatedRecord.getLastSyncStateTimeInMs());
+
+ // Set the HA group status to store and forward again and verify
+ // that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC)
+ // The time should only update when we move to AIS to ANIS
+ haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ Optional<HAGroupStoreRecord> updatedRecordOpt2 =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt2.isPresent());
+ HAGroupStoreRecord updatedRecord2 = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
updatedRecord2.getHAGroupState());
+ assertEquals(updatedRecord.getLastSyncStateTimeInMs(),
updatedRecord2.getLastSyncStateTimeInMs());
}
@Test
- public void testSetHAGroupStatusRecordToSync() throws Exception {
+ public void testSetHAGroupStatusToSync() throws Exception {
String haGroupName = testName.getMethodName();
HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
- // Create an initial HAGroupStoreRecord with ACTIVE_NOT_IN_SYNC status
- HAGroupStoreRecord initialRecord = new HAGroupStoreRecord(
- "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
-
- haAdmin.createHAGroupStoreRecordInZooKeeper(initialRecord);
- Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ // Initial record should be present in ACTIVE_NOT_IN_SYNC status
+ HAGroupStoreRecord initialRecord =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName).orElse(null);
+ assertNotNull(initialRecord);
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
initialRecord.getHAGroupState());
+ assertNotNull(initialRecord.getLastSyncStateTimeInMs());
- // Set the HA group status to sync (ACTIVE)
- haGroupStoreManager.setHAGroupStatusRecordToSync(haGroupName);
+ // Set the HA group status to sync (ACTIVE), we need to wait for
ZK_SESSION_TIMEOUT * Multiplier
+ Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT,
DEFAULT_ZK_SESSION_TIMEOUT)
+ * ZK_SESSION_TIMEOUT_MULTIPLIER));
+ haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
- // Verify the status was updated to ACTIVE
+ // Verify the state was updated to ACTIVE_IN_SYNC
Optional<HAGroupStoreRecord> updatedRecordOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
assertTrue(updatedRecordOpt.isPresent());
HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
- assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE,
updatedRecord.getClusterRole());
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
updatedRecord.getHAGroupState());
+ assertNull(updatedRecord.getLastSyncStateTimeInMs());
}
@Test
@@ -350,4 +459,130 @@ public class HAGroupStoreManagerIT extends BaseTest {
}
+ @Test
+ public void testSetReaderToDegraded() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+
+ // Update the auto-created record to STANDBY state for testing
+ HAGroupStoreRecord standbyRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY);
+
+ // Get the record to initialize ZNode from HAGroup so that we can
artificially update it via HAAdmin
+ Optional<HAGroupStoreRecord> currentRecord =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(currentRecord.isPresent());
+
+ // Update via HAAdmin
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
standbyRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToDegraded
+ haGroupStoreManager.setReaderToDegraded(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to DEGRADED_STANDBY_FOR_READER
+ Optional<HAGroupStoreRecord> updatedRecordOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+
assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
updatedRecord.getHAGroupState());
+
+ // Test transition from DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY
+ HAGroupStoreRecord degradedWriterRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
degradedWriterRecord, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToDegraded again
+ haGroupStoreManager.setReaderToDegraded(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to DEGRADED_STANDBY
+ updatedRecordOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ updatedRecord = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY,
updatedRecord.getHAGroupState());
+ }
+
+ @Test
+ public void testSetReaderToHealthy() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+
+ // Get the record to initialize ZNode from HAGroup so that we can
artificially update it via HAAdmin
+ Optional<HAGroupStoreRecord> currentRecord =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(currentRecord.isPresent());
+
+ // Update the auto-created record to DEGRADED_STANDBY_FOR_READER state
for testing
+ HAGroupStoreRecord degradedReaderRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
degradedReaderRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToHealthy
+ haGroupStoreManager.setReaderToHealthy(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to STANDBY
+ Optional<HAGroupStoreRecord> updatedRecordOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY,
updatedRecord.getHAGroupState());
+
+ // Test transition from DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER
+ HAGroupStoreRecord degradedRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
degradedRecord, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToHealthy again
+ haGroupStoreManager.setReaderToHealthy(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to DEGRADED_STANDBY_FOR_WRITER
+ updatedRecordOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ updatedRecord = updatedRecordOpt.get();
+
assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER,
updatedRecord.getHAGroupState());
+ }
+
+ @Test
+ public void testReaderStateTransitionInvalidStates() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+
+ // Get the record to initialize ZNode from HAGroup so that we can
artificially update it via HAAdmin
+ Optional<HAGroupStoreRecord> currentRecord =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(currentRecord.isPresent());
+
+ // Update the auto-created record to ACTIVE_IN_SYNC state (invalid for
both operations)
+ HAGroupStoreRecord activeRecord = new HAGroupStoreRecord(
+ "1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, activeRecord,
0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Test setReaderToDegraded with invalid state
+ try {
+ haGroupStoreManager.setReaderToDegraded(haGroupName);
+ fail("Expected InvalidClusterRoleTransitionException for
setReaderToDegraded with ACTIVE_IN_SYNC state");
+ } catch (InvalidClusterRoleTransitionException e) {
+ // Expected behavior
+ assertTrue("Exception should mention the invalid transition",
+ e.getMessage().contains("ACTIVE_IN_SYNC") &&
e.getMessage().contains("DEGRADED_STANDBY_FOR_READER"));
+ }
+
+ // Test setReaderToHealthy with invalid state
+ try {
+ haGroupStoreManager.setReaderToHealthy(haGroupName);
+ fail("Expected InvalidClusterRoleTransitionException for
setReaderToHealthy with ACTIVE_IN_SYNC state");
+ } catch (InvalidClusterRoleTransitionException e) {
+ // Expected behavior
+ assertTrue("Exception should mention the invalid transition",
+ e.getMessage().contains("ACTIVE_IN_SYNC") &&
e.getMessage().contains("STANDBY"));
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
index 5e2685d03a..044a8a4925 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
@@ -41,7 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -71,8 +71,8 @@ public class PhoenixHAAdminIT extends BaseTest {
@Before
public void before() throws Exception {
- haAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
ZK_CONSISTENT_HA_NAMESPACE);
- peerHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_NAMESPACE);
+ haAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+ peerHaAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
cleanupTestZnodes();
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
index d15e73aca2..abc4d02a50 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
@@ -110,4 +110,19 @@ public class HAGroupStoreTestUtil {
conn.commit();
}
}
-}
\ No newline at end of file
+
+ /**
+ * Deletes all HA group records from the system table for testing purposes.
+ *
+ * @param zkUrl the ZooKeeper URL to connect to
+ * @throws SQLException if the database operation fails
+ */
+ public static void deleteAllHAGroupRecordsInSystemTable(String zkUrl)
throws SQLException {
+ // Delete all records from System Table
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("DELETE FROM " + SYSTEM_HA_GROUP_NAME);
+ conn.commit();
+ }
+ }
+}
\ No newline at end of file