apurtell commented on code in PR #2479:
URL: https://github.com/apache/phoenix/pull/2479#discussion_r3270168719
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -983,32 +1017,65 @@ private void closePeerConnection() {
/**
* Shuts down the periodic sync executor gracefully.
*/
+ /**
+ * Remove this instance from the static {@link #instances} map. Idempotent.
Uses value-based
+ * remove so that, if a concurrent {@link #getInstanceForZkUrl} has already
swapped in a fresh
+ * replacement, the replacement is preserved.
+ */
+ private void deregisterFromInstances() {
+ final String key = (this.zkUrl != null) ? this.zkUrl :
getLocalZkUrl(this.conf);
+ if (key == null) {
+ return;
+ }
+ final ConcurrentHashMap<String, HAGroupStoreClient> bucket =
instances.get(key);
+ if (bucket == null) {
+ return;
+ }
+ bucket.remove(this.haGroupName, this);
Review Comment:
Removing the inner map introduces a race condition that could cause a NPE.
`close()` is not synchronized so it can run concurrently with
`getInstanceForZkUrl`'s synchronized block, which includes this code:
```java
instances.putIfAbsent(localZkUrl, new ConcurrentHashMap<>());
instances.get(localZkUrl).put(haGroupName, result);
```
Consider:
1. Thread A (getInstanceForZkUrl, in synchronized):
instances.putIfAbsent(localZkUrl, …) — bucket already present, no-op.
2. Thread B (close on a sibling instance for the same zkUrl):
bucket.remove(siblingHaGroupName, sibling) empties the bucket; computeIfPresent
removes the bucket from the outer map.
3. Thread A: instances.get(localZkUrl) → null.
4. Thread A: Attempt to call method put(haGroupName, result) with a null
reference throws NPE inside the static synchronized block.
The new client is created but never registered, the synchronized lookup is
poisoned for the caller, and on the next call we leak ZK watchers + executors
by recreating without knowing about the orphan.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -1047,6 +1114,177 @@ private long
validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu
return Math.max(0, remainingTime);
}
+ // ========== Legacy /phoenix/ha CRR Sync ==========
+
+ /**
+ * Derives the combined CRR from local + peer records and CAS-writes it to
{@code /phoenix/ha}.
+ * CAS losses are logged and skipped; the next consistentHA cache event or
periodic cycle
+ * reconverges.
+ */
+ private void syncLegacyCRRIfRoleChanged() {
+ if (!legacyCrrSyncEnabled || !isHealthy) {
+ return;
+ }
+ // Snapshot mutable resources up front so a concurrent close() can't null
them mid-method
+ // and trigger NPEs / writes through a torn-down Curator client.
+ final PhoenixHAAdmin admin = this.legacyHaAdmin;
+ final NodeCache cache = this.legacyCrrNodeCache;
+ if (admin == null || cache == null) {
+ return;
+ }
+ try {
+ HAGroupStoreRecord local = getHAGroupStoreRecord();
+ if (local == null) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local
consistentHA record",
+ haGroupName);
+ return;
+ }
+ // Wait for peer URL before building the desired CRR (ctor NPEs on null
url2).
+ if (StringUtils.isBlank(local.getPeerZKUrl())) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is
blank", haGroupName);
+ return;
+ }
+ HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer();
+ // NodeCache is eventually consistent; on apparent absence, fall back to
an authoritative
+ // ZK read so the equality check and CAS both see consistent state.
+ Pair<ClusterRoleRecord, Stat> snapshot = readLegacyCrrSnapshot(cache);
+ if (snapshot.getRight() == null) {
+ snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ }
+ ClusterRoleRecord existing = snapshot.getLeft();
+ Stat existingStat = snapshot.getRight();
+ if (!shouldWriteLegacyCrr(existing)) {
+ return;
+ }
+ ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing);
+ if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) {
+ LOGGER.debug("Legacy CRR for HA group {} already up to date at version
{}", haGroupName,
+ existing.getVersion());
+ return;
+ }
+ try {
+ if (existingStat == null) {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0);
+ } else {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION,
existingStat.getVersion());
+ }
+ LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})",
haGroupName,
+ existing != null ? existing.getVersion() : -1L,
desired.getVersion());
+ } catch (StaleClusterRoleRecordVersionException stale) {
+ // CAS lost; next event/periodic cycle reconverges.
+ LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat
version {}", haGroupName,
Review Comment:
Drop this to DEBUG level.
Each RS independently runs the sync, so during a state transition every RS
computes the same desired CRR and races on CAS. Only one can win and the other
N−1 RSes will log this line at `INFO` level.
On a 500 node cluster, that is 499 unnecessary, imho, INFO level log lines
every time this happens.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -1047,6 +1114,177 @@ private long
validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu
return Math.max(0, remainingTime);
}
+ // ========== Legacy /phoenix/ha CRR Sync ==========
+
+ /**
+ * Derives the combined CRR from local + peer records and CAS-writes it to
{@code /phoenix/ha}.
+ * CAS losses are logged and skipped; the next consistentHA cache event or
periodic cycle
+ * reconverges.
+ */
+ private void syncLegacyCRRIfRoleChanged() {
+ if (!legacyCrrSyncEnabled || !isHealthy) {
+ return;
+ }
+ // Snapshot mutable resources up front so a concurrent close() can't null
them mid-method
+ // and trigger NPEs / writes through a torn-down Curator client.
+ final PhoenixHAAdmin admin = this.legacyHaAdmin;
+ final NodeCache cache = this.legacyCrrNodeCache;
+ if (admin == null || cache == null) {
+ return;
+ }
+ try {
+ HAGroupStoreRecord local = getHAGroupStoreRecord();
+ if (local == null) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local
consistentHA record",
+ haGroupName);
+ return;
+ }
+ // Wait for peer URL before building the desired CRR (ctor NPEs on null
url2).
+ if (StringUtils.isBlank(local.getPeerZKUrl())) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is
blank", haGroupName);
+ return;
+ }
+ HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer();
+ // NodeCache is eventually consistent; on apparent absence, fall back to
an authoritative
+ // ZK read so the equality check and CAS both see consistent state.
+ Pair<ClusterRoleRecord, Stat> snapshot = readLegacyCrrSnapshot(cache);
+ if (snapshot.getRight() == null) {
+ snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ }
+ ClusterRoleRecord existing = snapshot.getLeft();
+ Stat existingStat = snapshot.getRight();
+ if (!shouldWriteLegacyCrr(existing)) {
+ return;
+ }
+ ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing);
+ if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) {
+ LOGGER.debug("Legacy CRR for HA group {} already up to date at version
{}", haGroupName,
+ existing.getVersion());
+ return;
+ }
+ try {
+ if (existingStat == null) {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0);
+ } else {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION,
existingStat.getVersion());
+ }
+ LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})",
haGroupName,
+ existing != null ? existing.getVersion() : -1L,
desired.getVersion());
+ } catch (StaleClusterRoleRecordVersionException stale) {
+ // CAS lost; next event/periodic cycle reconverges.
+ LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat
version {}", haGroupName,
+ existingStat != null ? existingStat.getVersion() : -1);
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Legacy CRR sync failed for HA group {}; will be retried by next
event/periodic cycle",
+ haGroupName, e);
+ }
+ }
+
+ /**
+ * Policy gate before issuing a CAS write to the legacy CRR. Returns {@code
false} when the
+ * existing record must not be overwritten by this client.
+ */
+ private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) {
+ // Refuse to overwrite a non-ZK (admin-managed RPC) legacy CRR; live
readers use its
+ // registryType to build connection strings, so swapping form would break
them.
+ if (existing != null && existing.getRegistryType() != RegistryType.ZK) {
Review Comment:
Do pre-existing legacy CRRs without `registryType` get permanently locked
out?
`ClusterRoleRecord.fromJson` deserializes pre-PR JSON without a
`registryType` field as `RegistryType.RPC`. `shouldWriteLegacyCrr` then bails
out here without updating anything.
##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -1047,6 +1114,177 @@ private long
validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu
return Math.max(0, remainingTime);
}
+ // ========== Legacy /phoenix/ha CRR Sync ==========
+
+ /**
+ * Derives the combined CRR from local + peer records and CAS-writes it to
{@code /phoenix/ha}.
+ * CAS losses are logged and skipped; the next consistentHA cache event or
periodic cycle
+ * reconverges.
+ */
+ private void syncLegacyCRRIfRoleChanged() {
+ if (!legacyCrrSyncEnabled || !isHealthy) {
+ return;
+ }
+ // Snapshot mutable resources up front so a concurrent close() can't null
them mid-method
+ // and trigger NPEs / writes through a torn-down Curator client.
+ final PhoenixHAAdmin admin = this.legacyHaAdmin;
+ final NodeCache cache = this.legacyCrrNodeCache;
+ if (admin == null || cache == null) {
+ return;
+ }
+ try {
+ HAGroupStoreRecord local = getHAGroupStoreRecord();
+ if (local == null) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local
consistentHA record",
+ haGroupName);
+ return;
+ }
+ // Wait for peer URL before building the desired CRR (ctor NPEs on null
url2).
+ if (StringUtils.isBlank(local.getPeerZKUrl())) {
+ LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is
blank", haGroupName);
+ return;
+ }
+ HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer();
+ // NodeCache is eventually consistent; on apparent absence, fall back to
an authoritative
+ // ZK read so the equality check and CAS both see consistent state.
+ Pair<ClusterRoleRecord, Stat> snapshot = readLegacyCrrSnapshot(cache);
+ if (snapshot.getRight() == null) {
+ snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+ }
+ ClusterRoleRecord existing = snapshot.getLeft();
+ Stat existingStat = snapshot.getRight();
+ if (!shouldWriteLegacyCrr(existing)) {
+ return;
+ }
+ ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing);
+ if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) {
+ LOGGER.debug("Legacy CRR for HA group {} already up to date at version
{}", haGroupName,
+ existing.getVersion());
+ return;
+ }
+ try {
+ if (existingStat == null) {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0);
+ } else {
+ admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+ PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION,
existingStat.getVersion());
+ }
+ LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})",
haGroupName,
+ existing != null ? existing.getVersion() : -1L,
desired.getVersion());
+ } catch (StaleClusterRoleRecordVersionException stale) {
+ // CAS lost; next event/periodic cycle reconverges.
+ LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat
version {}", haGroupName,
+ existingStat != null ? existingStat.getVersion() : -1);
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Legacy CRR sync failed for HA group {}; will be retried by next
event/periodic cycle",
+ haGroupName, e);
+ }
+ }
+
+ /**
+ * Policy gate before issuing a CAS write to the legacy CRR. Returns {@code
false} when the
+ * existing record must not be overwritten by this client.
+ */
+ private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) {
+ // Refuse to overwrite a non-ZK (admin-managed RPC) legacy CRR; live
readers use its
+ // registryType to build connection strings, so swapping form would break
them.
+ if (existing != null && existing.getRegistryType() != RegistryType.ZK) {
+ LOGGER.warn("Skipping legacy CRR sync for HA group {}: existing
registryType={} "
+ + "(requires admin migration to ZK form)", haGroupName,
existing.getRegistryType());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}.
When the local client's
+ * peer view is unavailable, preserves the {@code existing} record's peer
role rather than
+ * downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer
visibility would otherwise
+ * keep flipping it back, and the equality check naturally short-circuits
when no other field
+ * changed.
+ * <p>
+ * Look up the preserved role by the peer URL (not by {@code getRole2()})
since
+ * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by
alphabetical order; the
+ * peer URL may end up in either slot depending on lexical comparison.
+ */
+ private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local,
HAGroupStoreRecord peer,
+ ClusterRoleRecord existing) {
+ final ClusterRole role2;
+ if (peer != null) {
+ role2 = peer.getClusterRole();
+ } else if (existing != null) {
+ role2 = existing.getRole(JDBCUtil.formatUrl(local.getPeerZKUrl(),
RegistryType.ZK));
+ } else {
+ role2 = ClusterRole.UNKNOWN;
+ }
+ long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L;
+ long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L,
+ Math.max(local.getAdminCRRVersion(), peerAdminVersion));
+ return new ClusterRoleRecord(haGroupName,
HighAvailabilityPolicy.valueOf(local.getPolicy()),
+ RegistryType.ZK, this.zkUrl, local.getClusterRole(),
local.getPeerZKUrl(), role2,
+ baseVersion + 1);
+ }
+
+ /**
+ * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss;
callers must confirm
+ * absence with an authoritative ZK read since the cache is eventually
consistent. Caller passes
+ * the cache from a local snapshot, never the mutable field (which {@link
#close()} may null at
+ * any time).
+ */
+ private Pair<ClusterRoleRecord, Stat> readLegacyCrrSnapshot(NodeCache cache)
{
+ ChildData current = cache.getCurrentData();
+ if (current == null) {
+ return Pair.of(null, null);
+ }
+ ClusterRoleRecord record =
ClusterRoleRecord.fromJson(current.getData()).orElse(null);
+ return Pair.of(record, current.getStat());
+ }
+
+ /**
+ * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache,
single-thread executor, an
+ * off-lock initial sync, and the periodic reconciler. Called only when the
feature is enabled and
+ * the client is healthy.
+ */
+ private void setupLegacyCrrSync() throws Exception {
+ this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf,
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
+ this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(),
toPath(haGroupName));
+ this.legacyCrrNodeCache.start(true);
Review Comment:
`NodeCache.start()` does a synchronous `getData()` call to ZK on this
thread, which is called by the the constructor. All of this is synchronized on
`HAGroupStoreClient.class` in `getInstanceForZkUrl`.
New instance creation across HA groups is serialized. If ZK is unhealthy or
slow there will be a head-of-line blocking here.
Is this expected?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]