kadirozde commented on code in PR #2302:
URL: https://github.com/apache/phoenix/pull/2302#discussion_r2434260840


##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -374,110 +446,369 @@ public HAGroupStoreRecord 
getHAGroupStoreRecordFromPeer() throws IOException {
         if (!isHealthy) {
             throw new IOException("HAGroupStoreClient is not healthy");
         }
-        return fetchCacheRecord(this.peerPathChildrenCache, 
ClusterType.PEER).getLeft();
+        return 
fetchCacheRecordAndPopulateZKIfNeeded(this.peerPathChildrenCache, 
ClusterType.PEER).getLeft();
     }
 
-    private void initializeZNodeIfNeeded() throws IOException,
-            StaleHAGroupStoreRecordVersionException {
+    private void initializeZNodeIfNeeded() throws IOException, SQLException {
         // Sometimes the ZNode might not be available in local cache yet, so 
better to check
         // in ZK directly if we need to initialize
         Pair<HAGroupStoreRecord, Stat> cacheRecordFromZK =
                 
phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName);
         HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft();
-        HAGroupState defaultHAGroupState = 
this.clusterRole.getDefaultHAGroupState();
-        // Initialize lastSyncTimeInMs only if we start in ACTIVE_NOT_IN_SYNC 
state
-        //  and ZNode is not already present
-        Long lastSyncTimeInMs = 
defaultHAGroupState.equals(HAGroupState.ACTIVE_NOT_IN_SYNC)
-                                    ? System.currentTimeMillis()
-                                    : null;
-        HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord(
-            HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
-            haGroupName,
-            this.clusterRole.getDefaultHAGroupState(),
-            lastSyncTimeInMs
-        );
-        // Only update current ZNode if it doesn't have the same role as 
present in System Table.
-        // If not exists, then create ZNode
-        // TODO: Discuss if this approach is what reader needs.
-        if (haGroupStoreRecord != null &&
-                !haGroupStoreRecord.getClusterRole().equals(this.clusterRole)) 
{
-            phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
-                    newHAGroupStoreRecord, 
cacheRecordFromZK.getRight().getVersion());
-        } else if (haGroupStoreRecord == null) {
+        // Only if the ZNode is not present, we need to create it from System 
Table
+        if (haGroupStoreRecord == null) {
+            SystemTableHAGroupRecord systemTableRecord
+                    = getSystemTableHAGroupRecord(this.haGroupName);
+            Preconditions.checkNotNull(systemTableRecord,
+                    "System Table HAGroupRecord cannot be null");
+            HAGroupStoreRecord.HAGroupState defaultHAGroupState
+                    = 
systemTableRecord.getClusterRole().getDefaultHAGroupState();
+            Long lastSyncTimeInMs
+                    = 
defaultHAGroupState.equals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC)
+                    ? System.currentTimeMillis()
+                    : null;
+            HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord(
+                    HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
+                    haGroupName,
+                    defaultHAGroupState,
+                    lastSyncTimeInMs,
+                    systemTableRecord.getPolicy().toString(),
+                    systemTableRecord.getPeerZKUrl(),
+                    systemTableRecord.getClusterUrl(),
+                    systemTableRecord.getPeerClusterUrl(),
+                    systemTableRecord.getAdminCRRVersion()
+            );
             
phoenixHaAdmin.createHAGroupStoreRecordInZooKeeper(newHAGroupStoreRecord);
         }
     }
 
-    private void initializeHAGroupStoreClientAttributes(String haGroupName) 
throws SQLException {
+    /**
+     * Inner class to hold system table HA group record
+     */
+    public static class SystemTableHAGroupRecord {
+        private final HighAvailabilityPolicy policy;
+        private final ClusterRoleRecord.ClusterRole clusterRole;
+        private final ClusterRoleRecord.ClusterRole peerClusterRole;
+        private final String clusterUrl;
+        private final String peerClusterUrl;
+        private final String zkUrl;
+        private final String peerZKUrl;
+        private final long adminCRRVersion;
+
+        public SystemTableHAGroupRecord(HighAvailabilityPolicy policy,
+                                        ClusterRoleRecord.ClusterRole 
clusterRole,
+                                        ClusterRoleRecord.ClusterRole 
peerClusterRole,
+                                        String clusterUrl,
+                                        String peerClusterUrl,
+                                        String zkUrl,
+                                        String peerZKUrl,
+                                        long adminCRRVersion) {
+            this.policy = policy;
+            this.clusterRole = clusterRole;
+            this.peerClusterRole = peerClusterRole;
+            this.clusterUrl = clusterUrl;
+            this.peerClusterUrl = peerClusterUrl;
+            this.zkUrl = zkUrl;
+            this.peerZKUrl = peerZKUrl;
+            this.adminCRRVersion = adminCRRVersion;
+        }
+
+        public HighAvailabilityPolicy getPolicy() {
+            return policy;
+        }
+
+        public ClusterRoleRecord.ClusterRole getClusterRole() {
+            return clusterRole;
+        }
+
+        public ClusterRoleRecord.ClusterRole getPeerClusterRole() {
+            return peerClusterRole;
+        }
+
+        public String getClusterUrl() {
+            return clusterUrl;
+        }
+
+        public String getPeerClusterUrl() {
+            return peerClusterUrl;
+        }
+
+        public String getZkUrl() {
+            return zkUrl;
+        }
+
+        public String getPeerZKUrl() {
+            return peerZKUrl;
+        }
+
+        public long getAdminCRRVersion() {
+            return adminCRRVersion;
+        }
+    }
+
+    private SystemTableHAGroupRecord getSystemTableHAGroupRecord(String 
haGroupName)
+            throws SQLException {
         String queryString = String.format("SELECT * FROM %s WHERE %s = '%s'",
                 SYSTEM_HA_GROUP_NAME, HA_GROUP_NAME, haGroupName);
         try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(
                 JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
              Statement stmt = conn.createStatement();
              ResultSet rs = stmt.executeQuery(queryString)) {
             if (rs.next()) {
-                this.policy = 
HighAvailabilityPolicy.valueOf(rs.getString(POLICY));
+                HighAvailabilityPolicy policy
+                        = HighAvailabilityPolicy.valueOf(rs.getString(POLICY));
                 String zkUrl1 = rs.getString(ZK_URL_1);
                 String zkUrl2 = rs.getString(ZK_URL_2);
                 String clusterRole1 = rs.getString(CLUSTER_ROLE_1);
                 String clusterRole2 = rs.getString(CLUSTER_ROLE_2);
                 String clusterUrl1 = rs.getString(CLUSTER_URL_1);
                 String clusterUrl2 = rs.getString(CLUSTER_URL_2);
-                this.clusterRoleRecordVersion = rs.getLong(VERSION);
-                Preconditions.checkNotNull(zkUrl1, "ZK_URL_1 in System Table 
cannot be null");
-                Preconditions.checkNotNull(zkUrl2, "ZK_URL_2 in System Table 
cannot be null");
-                String formattedZkUrl1 = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
-                String formattedZkUrl2 = JDBCUtil.formatUrl(zkUrl2, 
RegistryType.ZK);
+                long adminCRRVersion = rs.getLong(VERSION);
+                String formattedZkUrl1 = null;
+                String formattedZkUrl2 = null;
+                if (StringUtils.isNotBlank(zkUrl1)) {
+                    formattedZkUrl1 = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
+                }
+                if (StringUtils.isNotBlank(zkUrl2)) {
+                    formattedZkUrl2 = JDBCUtil.formatUrl(zkUrl2, 
RegistryType.ZK);
+                }
                 String formattedZkUrl = JDBCUtil.formatUrl(this.zkUrl, 
RegistryType.ZK);
+                String peerZKUrl;
+                ClusterRoleRecord.ClusterRole clusterRole;
+                ClusterRoleRecord.ClusterRole peerClusterRole;
+                String clusterUrl;
+                String peerClusterUrl;
+
                 if (StringUtils.equals(formattedZkUrl1, formattedZkUrl)) {
-                    this.peerZKUrl = formattedZkUrl2;
-                    this.clusterRole = ClusterRoleRecord.ClusterRole.from(
+                    peerZKUrl = formattedZkUrl2;
+                    clusterRole = ClusterRoleRecord.ClusterRole.from(
                             clusterRole1.getBytes(StandardCharsets.UTF_8));
-                    this.peerClusterRole = ClusterRoleRecord.ClusterRole.from(
+                    peerClusterRole = ClusterRoleRecord.ClusterRole.from(
                             clusterRole2.getBytes(StandardCharsets.UTF_8));
-                    this.clusterUrl = clusterUrl1;
-                    this.peerClusterUrl = clusterUrl2;
+                    clusterUrl = clusterUrl1;
+                    peerClusterUrl = clusterUrl2;
                 } else if (StringUtils.equals(formattedZkUrl2, 
formattedZkUrl)) {
-                    this.peerZKUrl = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
-                    this.clusterRole = ClusterRoleRecord.ClusterRole.from(
+                    peerZKUrl = JDBCUtil.formatUrl(zkUrl1, RegistryType.ZK);
+                    clusterRole = ClusterRoleRecord.ClusterRole.from(
                             clusterRole2.getBytes(StandardCharsets.UTF_8));
-                    this.peerClusterRole = ClusterRoleRecord.ClusterRole.from(
+                    peerClusterRole = ClusterRoleRecord.ClusterRole.from(
                             clusterRole1.getBytes(StandardCharsets.UTF_8));
-                    this.clusterUrl = clusterUrl2;
-                    this.peerClusterUrl = clusterUrl1;
+                    clusterUrl = clusterUrl2;
+                    peerClusterUrl = clusterUrl1;
+                } else {
+                    throw new SQLException("Current zkUrl does not match"
+                            + "any zkUrl in System Table for HA group: " + 
haGroupName);
                 }
+
+                Preconditions.checkNotNull(clusterRole,
+                        "Cluster role in System Table cannot be null");
+                Preconditions.checkNotNull(peerClusterRole,
+                        "Peer cluster role in System Table cannot be null");
+                Preconditions.checkNotNull(clusterUrl,
+                        "Cluster URL in System Table cannot be null");
+                Preconditions.checkNotNull(peerZKUrl,
+                        "Peer ZK URL in System Table cannot be null");
+                Preconditions.checkNotNull(peerClusterUrl,
+                        "Peer Cluster URL in System Table cannot be null");
+
+                return new SystemTableHAGroupRecord(policy, clusterRole, 
peerClusterRole,
+                        clusterUrl, peerClusterUrl, formattedZkUrl, peerZKUrl, 
adminCRRVersion);
             } else {
                 throw new SQLException("HAGroupStoreRecord not found for HA 
group name: " +
                         haGroupName + " in System Table " + 
SYSTEM_HA_GROUP_NAME);
             }
         }
-        Preconditions.checkNotNull(this.clusterRole,
-                "Cluster role in System Table cannot be null");
-        Preconditions.checkNotNull(this.peerClusterRole,
-                "Peer cluster role in System Table cannot be null");
-        Preconditions.checkNotNull(this.clusterUrl,
-                "Cluster URL in System Table cannot be null");
-        Preconditions.checkNotNull(this.peerZKUrl,
-                "Peer ZK URL in System Table cannot be null");
-        Preconditions.checkNotNull(this.peerClusterUrl,
-                "Peer Cluster URL in System Table cannot be null");
-        Preconditions.checkNotNull(this.clusterRoleRecordVersion,
-                "Cluster role record version in System Table cannot be null");
     }
 
-    private void maybeInitializePeerPathChildrenCache() {
-        if (StringUtils.isNotBlank(this.peerZKUrl)) {
+    // Update the system table on best effort basis for HA group
+    // In case of failure, we will log the error and continue.
+    private void updateSystemTableHAGroupRecordSilently(String haGroupName,
+                                                        
SystemTableHAGroupRecord record)
+            throws SQLException {
+        StringBuilder updateQuery = new StringBuilder("UPSERT INTO " + 
SYSTEM_HA_GROUP_NAME + " (");
+        StringBuilder valuesClause = new StringBuilder(" VALUES (");
+        List<Object> parameters = new ArrayList<>();
+
+        // Always include HA_GROUP_NAME as it's the key
+        updateQuery.append(HA_GROUP_NAME);
+        valuesClause.append("?");
+        parameters.add(haGroupName);
+
+        // Update non-null fields only.
+        if (record.getPolicy() != null) {
+            updateQuery.append(", ").append(POLICY);
+            valuesClause.append(", ?");
+            parameters.add(record.getPolicy().toString());
+        }
+
+        if (record.getClusterRole() != null) {
+            updateQuery.append(", ").append(CLUSTER_ROLE_1);
+            valuesClause.append(", ?");
+            parameters.add(record.getClusterRole().name());
+        }
+
+        if (record.getPeerClusterRole() != null) {
+            updateQuery.append(", ").append(CLUSTER_ROLE_2);
+            valuesClause.append(", ?");
+            parameters.add(record.getPeerClusterRole().name());
+        }
+
+        if (record.getClusterUrl() != null) {
+            updateQuery.append(", ").append(CLUSTER_URL_1);
+            valuesClause.append(", ?");
+            parameters.add(record.getClusterUrl());
+        }
+
+        if (record.getPeerClusterUrl() != null) {
+            updateQuery.append(", ").append(CLUSTER_URL_2);
+            valuesClause.append(", ?");
+            parameters.add(record.getPeerClusterUrl());
+        }
+
+        if (record.getZkUrl() != null) {
+            updateQuery.append(", ").append(ZK_URL_1);
+            valuesClause.append(", ?");
+            parameters.add(record.getZkUrl());
+        }
+
+        if (record.getPeerZKUrl() != null) {
+            updateQuery.append(", ").append(ZK_URL_2);
+            valuesClause.append(", ?");
+            parameters.add(record.getPeerZKUrl());
+        }
+
+        if (record.getAdminCRRVersion() > 0) {
+            updateQuery.append(", ").append(VERSION);
+            valuesClause.append(", ?");
+            parameters.add(record.getAdminCRRVersion());
+        }
+
+        updateQuery.append(")").append(valuesClause).append(")");
+
+        try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(
+                JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+             PreparedStatement pstmt = 
conn.prepareStatement(updateQuery.toString())) {
+
+            for (int i = 0; i < parameters.size(); i++) {
+                pstmt.setObject(i + 1, parameters.get(i));
+            }
+
+            pstmt.executeUpdate();
+            conn.commit();
+        } catch (Exception e) {
+            LOGGER.error("Failed to update system table on best"
+                    + "effort basis for HA group {}, error: {}", haGroupName, 
e);
+        }
+    }
+
+    /**
+     * Starts the periodic sync job that syncs ZooKeeper data (source of 
truth) to system table.
+     * The job runs at configurable intervals with a random jitter for the 
initial delay.
+     */
+    private void startPeriodicSyncJob() {
+        if (syncExecutor == null) {
+            syncExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+                Thread t = new Thread(r, "HAGroupStoreClient-SyncJob-" + 
haGroupName);
+                t.setDaemon(true);
+                return t;
+            });
+
+            // Get sync interval from configuration (in seconds)
+            long syncIntervalSeconds = 
conf.getLong(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
+                    DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS);
+
+            // Add jitter to initial delay
+            long jitterSeconds
+                    = ThreadLocalRandom.current().nextLong(0, 
SYNC_JOB_MAX_JITTER_SECONDS + 1);
+
+            LOGGER.info("Starting periodic sync job for HA group {} "
+                            + "with initial delay of {} seconds, "
+                            + "then every {} seconds",
+                    haGroupName,
+                    jitterSeconds,
+                    syncIntervalSeconds);
+
+            syncExecutor.scheduleAtFixedRate(
+                    this::syncZKToSystemTable,
+                    jitterSeconds,
+                    syncIntervalSeconds,
+                    TimeUnit.SECONDS
+            );
+        }
+    }
+
+    /**
+     * Syncs data from ZooKeeper (source of truth) to the system table.
+     * This method is called periodically to ensure consistency.
+     */
+    private void syncZKToSystemTable() {

Review Comment:
   Instead of syncing every time, can we check if sync needed by reading the HA 
record from the HA group system table first, and then sync only if needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to