ritegarg commented on code in PR #2479:
URL: https://github.com/apache/phoenix/pull/2479#discussion_r3291065636


##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -1047,6 +1114,177 @@ private long 
validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState cu
     return Math.max(0, remainingTime);
   }
 
+  // ========== Legacy /phoenix/ha CRR Sync ==========
+
+  /**
+   * Derives the combined CRR from local + peer records and CAS-writes it to 
{@code /phoenix/ha}.
+   * CAS losses are logged and skipped; the next consistentHA cache event or 
periodic cycle
+   * reconverges.
+   */
+  private void syncLegacyCRRIfRoleChanged() {
+    if (!legacyCrrSyncEnabled || !isHealthy) {
+      return;
+    }
+    // Snapshot mutable resources up front so a concurrent close() can't null 
them mid-method
+    // and trigger NPEs / writes through a torn-down Curator client.
+    final PhoenixHAAdmin admin = this.legacyHaAdmin;
+    final NodeCache cache = this.legacyCrrNodeCache;
+    if (admin == null || cache == null) {
+      return;
+    }
+    try {
+      HAGroupStoreRecord local = getHAGroupStoreRecord();
+      if (local == null) {
+        LOGGER.debug("Skipping legacy CRR sync for HA group {}: no local 
consistentHA record",
+          haGroupName);
+        return;
+      }
+      // Wait for peer URL before building the desired CRR (ctor NPEs on null 
url2).
+      if (StringUtils.isBlank(local.getPeerZKUrl())) {
+        LOGGER.debug("Skipping legacy CRR sync for HA group {}: peer ZK URL is 
blank", haGroupName);
+        return;
+      }
+      HAGroupStoreRecord peer = getHAGroupStoreRecordFromPeer();
+      // NodeCache is eventually consistent; on apparent absence, fall back to 
an authoritative
+      // ZK read so the equality check and CAS both see consistent state.
+      Pair<ClusterRoleRecord, Stat> snapshot = readLegacyCrrSnapshot(cache);
+      if (snapshot.getRight() == null) {
+        snapshot = admin.getClusterRoleRecordAndStatInZooKeeper(haGroupName);
+      }
+      ClusterRoleRecord existing = snapshot.getLeft();
+      Stat existingStat = snapshot.getRight();
+      if (!shouldWriteLegacyCrr(existing)) {
+        return;
+      }
+      ClusterRoleRecord desired = buildDesiredLegacyCrr(local, peer, existing);
+      if (desired.isLogicallyEqualIgnoringVersionAndRegistry(existing)) {
+        LOGGER.debug("Legacy CRR for HA group {} already up to date at version 
{}", haGroupName,
+          existing.getVersion());
+        return;
+      }
+      try {
+        if (existingStat == null) {
+          admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+            PhoenixHAAdmin.LegacyCrrWriteMode.CREATE_NEW, /* ignored */ 0);
+        } else {
+          admin.createOrUpdateClusterRoleRecordWithCAS(haGroupName, desired,
+            PhoenixHAAdmin.LegacyCrrWriteMode.CAS_WITH_VERSION, 
existingStat.getVersion());
+        }
+        LOGGER.info("Synced legacy CRR for HA group {} (version {} -> {})", 
haGroupName,
+          existing != null ? existing.getVersion() : -1L, 
desired.getVersion());
+      } catch (StaleClusterRoleRecordVersionException stale) {
+        // CAS lost; next event/periodic cycle reconverges.
+        LOGGER.info("Legacy CRR CAS lost for HA group {} at expected stat 
version {}", haGroupName,
+          existingStat != null ? existingStat.getVersion() : -1);
+      }
+    } catch (Exception e) {
+      LOGGER.warn(
+        "Legacy CRR sync failed for HA group {}; will be retried by next 
event/periodic cycle",
+        haGroupName, e);
+    }
+  }
+
+  /**
+   * Policy gate before issuing a CAS write to the legacy CRR. Returns {@code 
false} when the
+   * existing record must not be overwritten by this client.
+   */
+  private boolean shouldWriteLegacyCrr(ClusterRoleRecord existing) {
+    // Refuse to overwrite a non-ZK (admin-managed RPC) legacy CRR; live 
readers use its
+    // registryType to build connection strings, so swapping form would break 
them.
+    if (existing != null && existing.getRegistryType() != RegistryType.ZK) {
+      LOGGER.warn("Skipping legacy CRR sync for HA group {}: existing 
registryType={} "
+        + "(requires admin migration to ZK form)", haGroupName, 
existing.getRegistryType());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Builds the desired legacy CRR, always stamped {@link RegistryType#ZK}. 
When the local client's
+   * peer view is unavailable, preserves the {@code existing} record's peer 
role rather than
+   * downgrading it to {@link ClusterRole#UNKNOWN} — another RS with peer 
visibility would otherwise
+   * keep flipping it back, and the equality check naturally short-circuits 
when no other field
+   * changed.
+   * <p>
+   * Look up the preserved role by the peer URL (not by {@code getRole2()}) 
since
+   * {@link ClusterRoleRecord} canonicalizes {@code url1}/{@code url2} by 
alphabetical order; the
+   * peer URL may end up in either slot depending on lexical comparison.
+   */
+  private ClusterRoleRecord buildDesiredLegacyCrr(HAGroupStoreRecord local, 
HAGroupStoreRecord peer,
+    ClusterRoleRecord existing) {
+    final ClusterRole role2;
+    if (peer != null) {
+      role2 = peer.getClusterRole();
+    } else if (existing != null) {
+      role2 = existing.getRole(JDBCUtil.formatUrl(local.getPeerZKUrl(), 
RegistryType.ZK));
+    } else {
+      role2 = ClusterRole.UNKNOWN;
+    }
+    long peerAdminVersion = (peer != null) ? peer.getAdminCRRVersion() : 0L;
+    long baseVersion = Math.max(existing != null ? existing.getVersion() : 0L,
+      Math.max(local.getAdminCRRVersion(), peerAdminVersion));
+    return new ClusterRoleRecord(haGroupName, 
HighAvailabilityPolicy.valueOf(local.getPolicy()),
+      RegistryType.ZK, this.zkUrl, local.getClusterRole(), 
local.getPeerZKUrl(), role2,
+      baseVersion + 1);
+  }
+
+  /**
+   * NodeCache snapshot of the legacy CRR. {@code (null, null)} on cache miss; 
callers must confirm
+   * absence with an authoritative ZK read since the cache is eventually 
consistent. Caller passes
+   * the cache from a local snapshot, never the mutable field (which {@link 
#close()} may null at
+   * any time).
+   */
+  private Pair<ClusterRoleRecord, Stat> readLegacyCrrSnapshot(NodeCache cache) 
{
+    ChildData current = cache.getCurrentData();
+    if (current == null) {
+      return Pair.of(null, null);
+    }
+    ClusterRoleRecord record = 
ClusterRoleRecord.fromJson(current.getData()).orElse(null);
+    return Pair.of(record, current.getStat());
+  }
+
+  /**
+   * Initialize legacy {@code /phoenix/ha} sync: admin handle, NodeCache, 
single-thread executor, an
+   * off-lock initial sync, and the periodic reconciler. Called only when the 
feature is enabled and
+   * the client is healthy.
+   */
+  private void setupLegacyCrrSync() throws Exception {
+    this.legacyHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, 
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
+    this.legacyCrrNodeCache = new NodeCache(this.legacyHaAdmin.getCurator(), 
toPath(haGroupName));
+    this.legacyCrrNodeCache.start(true);

Review Comment:
   Switched to start(false) and posted an async rebuild() to 
legacyCrrSyncExecutor — keeps the cache warm for steady-state reads without 
holding the class monitor on ZK. First-sync cache miss is covered by the 
existing fallback at 
[L1163-L1166](https://github.com/apache/phoenix/pull/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java);
 rebuild failure logs at DEBUG and is retried on the next sync cycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to