ritegarg commented on code in PR #2244:
URL: https://github.com/apache/phoenix/pull/2244#discussion_r2267412086


##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -17,191 +17,640 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static 
org.apache.phoenix.query.QueryServices.HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
 
 
 /**
- * Write-through cache for HAGroupStore.
+ * Main implementation of HAGroupStoreClient with peer support.
+ * Write-through cache for HAGroupStore based on {@link HAGroupStoreRecord}.
  * Uses {@link PathChildrenCache} from {@link 
org.apache.curator.framework.CuratorFramework}.
  */
 public class HAGroupStoreClient implements Closeable {
 
+    public static final String ZK_CONSISTENT_HA_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
-    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+    private static final String CACHE_TYPE_LOCAL = "LOCAL";
+    private static final String CACHE_TYPE_PEER = "PEER";
     private PhoenixHAAdmin phoenixHaAdmin;
+    private PhoenixHAAdmin peerPhoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
-    // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
-    private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, 
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
-            = new ConcurrentHashMap<>();
-    private PathChildrenCache pathChildrenCache;
-    private volatile boolean isHealthy;
+    // Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
+    private static final Map<String, ConcurrentHashMap<String, 
HAGroupStoreClient>> instances =
+            new ConcurrentHashMap<>();
+    // HAGroupName for this instance
+    private final String haGroupName;
+    // PathChildrenCache for current cluster and HAGroupName
+    private PathChildrenCache pathChildrenCache = null;
+    // PathChildrenCache for peer cluster and HAGroupName
+    private PathChildrenCache peerPathChildrenCache = null;
+    // Whether the client is healthy
+    private volatile boolean isHealthy = false;
+    // Configuration
+    private final Configuration conf;
+    // ZK URL for the current cluster and HAGroupName
+    private String zkUrl;
+    // Peer ZK URL for peer cluster and HAGroupName
+    private String peerZKUrl = null;
+    // Peer Custom Event Listener
+    private final PathChildrenCacheListener 
peerCustomPathChildrenCacheListener;
+    // Wait time for sync mode
+    private final long waitTimeForSyncModeInMs;
+    // Wait time for store and forward mode
+    private final long waitTimeForStoreAndForwardModeInMs;
+    // Policy for the HA group
+    private HighAvailabilityPolicy policy;
+    private ClusterRole clusterRole;
+    private ClusterRole peerClusterRole;
+    private String clusterUrl;
+    private String peerClusterUrl;
+    private long clusterRoleRecordVersion;
+
 
     /**
      * Creates/gets an instance of HAGroupStoreClient.
      * Can return null instance if unable to initialize.
      *
      * @param conf configuration
+     * @param haGroupName name of the HA group. Only specified HA group is 
tracked.
+     * @param zkUrl zkUrl to use for the client. Prefer providing this 
parameter to avoid
+     *             the overhead of getting the local zkUrl from the 
configuration.
      * @return HAGroupStoreClient instance
      */
-    public static HAGroupStoreClient getInstance(Configuration conf) {
-        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+    public static HAGroupStoreClient getInstance(Configuration conf, String 
haGroupName,

Review Comment:
   The plan of action is:-
   
   1. Intercept the mutation to System.HA_Group table in IndexRegionObserver.
   2. Obtain read and write lock on row to maintain serialization
   3. If attribute changed is in worth broadcasting an update. eg. we don't 
need to broadcast update for clusterRole because ZNode will contain the state 
already.
                      a. If broadcast worthy, we use same mechanism as 
MetadataCache to invalidate. We create a new RPC                  to send 
updated attributes to all other HAGroupStoreClient and each HAGroupStoreClient 
updates its attributes. This has 1 retry built in.
                      b. Once complete for all RS, we let mutation complete. If 
mutation failed for even 1 RS, we fail the mutation and operator can retry.
   4. Release the lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to