This is an automated email from the ASF dual-hosted git repository.
lokiore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new e853980bc0 PHOENIX-7493 Graceful Failover with Phoenix HA (#2075)
e853980bc0 is described below
commit e853980bc04ae44e6670fd60857360d67831e607
Author: ritegarg <[email protected]>
AuthorDate: Fri Mar 28 09:31:22 2025 -0700
PHOENIX-7493 Graceful Failover with Phoenix HA (#2075)
---
.../phoenix/cache/ServerMetadataCacheImpl.java | 3 -
.../org/apache/phoenix/jdbc/ClusterRoleRecord.java | 4 +-
.../jdbc/ClusterRoleRecordGeneratorTool.java | 2 +-
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 207 +++++++++
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 80 ++++
.../apache/phoenix/jdbc/HighAvailabilityGroup.java | 2 -
.../phoenix/jdbc/HighAvailabilityPolicy.java | 2 +-
.../org/apache/phoenix/jdbc/PhoenixHAAdmin.java | 494 +++++++++++++++++++++
.../apache/phoenix/jdbc/PhoenixHAAdminTool.java | 466 +------------------
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 6 +-
.../protobuf/RegionServerEndpointService.proto | 9 +
.../coprocessor/PhoenixRegionServerEndpoint.java | 20 +
.../UngroupedAggregateRegionObserver.java | 18 +
.../phoenix/hbase/index/IndexRegionObserver.java | 12 +
phoenix-core/pom.xml | 4 +
.../phoenix/cache/ServerMetadataCacheIT.java | 2 -
.../end2end/PhoenixRegionServerEndpointIT.java | 13 +-
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 427 ++++++++++++++++++
.../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 88 ++++
.../jdbc/HighAvailabilityTestingUtility.java | 12 +-
.../phoenix/jdbc/PhoenixHAAdminToolTest.java | 12 +-
22 files changed, 1409 insertions(+), 476 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
index 5f9aa10455..91b783628b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
@@ -20,8 +20,6 @@ package org.apache.phoenix.cache;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Bytes;
@@ -33,7 +31,6 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
index aa28c6e85f..c2af18df8b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
@@ -59,13 +59,13 @@ public class ClusterRoleRecord {
* take traffic, standby and offline do not, and unknown is used if the
state cannot be determined.
*/
public enum ClusterRole {
- ACTIVE, STANDBY, OFFLINE, UNKNOWN;
+ ACTIVE, STANDBY, OFFLINE, UNKNOWN, ACTIVE_TO_STANDBY;
/**
* @return true if a cluster with this role can be connected,
otherwise false
*/
public boolean canConnect() {
- return this == ACTIVE || this == STANDBY;
+ return this == ACTIVE || this == STANDBY || this ==
ACTIVE_TO_STANDBY;
}
public static ClusterRole from(byte[] bytes) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
index 93899f87a2..03c29320d4 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
@@ -40,7 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ATTR_PREFIX;
-import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
/**
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
new file mode 100644
index 0000000000..05f4247ace
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+/**
+ * Write-through cache for HAGroupStore.
+ * Uses {@link PathChildrenCache} from {@link
org.apache.curator.framework.CuratorFramework}.
+ */
+public class HAGroupStoreClient implements Closeable {
+
+ private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS
= 30000L;
+ private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+ private PhoenixHAAdmin phoenixHaAdmin;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HAGroupStoreClient.class);
+ // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
+ private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole,
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
+ = new ConcurrentHashMap<>();
+ private PathChildrenCache pathChildrenCache;
+ private volatile boolean isHealthy;
+
+ /**
+ * Creates/gets an instance of HAGroupStoreClient.
+ * Can return null instance if unable to initialize.
+ *
+ * @param conf configuration
+ * @return HAGroupStoreClient instance
+ */
+ public static HAGroupStoreClient getInstance(Configuration conf) {
+ if (haGroupStoreClientInstance == null ||
!haGroupStoreClientInstance.isHealthy) {
+ synchronized (HAGroupStoreClient.class) {
+ if (haGroupStoreClientInstance == null ||
!haGroupStoreClientInstance.isHealthy) {
+ haGroupStoreClientInstance = new HAGroupStoreClient(conf,
null);
+ if (!haGroupStoreClientInstance.isHealthy) {
+ haGroupStoreClientInstance.close();
+ haGroupStoreClientInstance = null;
+ }
+ }
+ }
+ }
+ return haGroupStoreClientInstance;
+ }
+
+ @VisibleForTesting
+ HAGroupStoreClient(final Configuration conf, final
PathChildrenCacheListener pathChildrenCacheListener) {
+ try {
+ this.phoenixHaAdmin = new PhoenixHAAdmin(conf);
+ final PathChildrenCache pathChildrenCache;
+ pathChildrenCache = new
PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true);
+ final CountDownLatch latch = new CountDownLatch(1);
+ if (pathChildrenCacheListener != null) {
+
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
+ } else {
+ pathChildrenCache.getListenable().addListener((client, event)
-> {
+ LOGGER.info("HAGroupStoreClient PathChildrenCache Received
event for type {}", event.getType());
+ final ChildData childData = event.getData();
+ ClusterRoleRecord eventCRR = extractCRROrNull(childData);
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ if (eventCRR != null && eventCRR.getHaGroupName()
!= null) {
+ updateClusterRoleRecordMap(eventCRR);
+ }
+ break;
+ case CHILD_REMOVED:
+ // In case of CHILD_REMOVED, we get the old
version of data that was just deleted in event.
+ if (eventCRR != null && eventCRR.getHaGroupName()
!= null
+ && !eventCRR.getHaGroupName().isEmpty()
+ &&
eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) {
+ LOGGER.info("Received CHILD_REMOVED event,
Removing CRR {} from existing CRR Map {}", eventCRR, clusterRoleToCRRMap);
+ final ClusterRoleRecord.ClusterRole role =
eventCRR.getRole(phoenixHaAdmin.getZkUrl());
+ clusterRoleToCRRMap.putIfAbsent(role, new
ConcurrentHashMap<>());
+
clusterRoleToCRRMap.get(role).remove(eventCRR.getHaGroupName());
+ }
+ break;
+ case INITIALIZED:
+ latch.countDown();
+ break;
+ case CONNECTION_LOST:
+ case CONNECTION_SUSPENDED:
+ isHealthy = false;
+ break;
+ case CONNECTION_RECONNECTED:
+ isHealthy = true;
+ break;
+ default:
+ LOGGER.warn("Unexpected event type {}, complete
event {}", event.getType(), event);
+ }
+ });
+ }
+
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+ this.pathChildrenCache = pathChildrenCache;
+ isHealthy =
latch.await(HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ buildClusterRoleToCRRMap();
+ } catch (Exception e) {
+ isHealthy = false;
+ LOGGER.error("Unexpected error occurred while initializing
HAGroupStoreClient, marking cache as unhealthy", e);
+ }
+ }
+
+ private ClusterRoleRecord extractCRROrNull(final ChildData childData) {
+ if (childData != null) {
+ byte[] data = childData.getData();
+ return ClusterRoleRecord.fromJson(data).orElse(null);
+ }
+ return null;
+ }
+
+ private void updateClusterRoleRecordMap(final ClusterRoleRecord crr) {
+ if (crr != null && crr.getHaGroupName() != null &&
crr.getRole(phoenixHaAdmin.getZkUrl()) != null) {
+ ClusterRoleRecord.ClusterRole role =
crr.getRole(phoenixHaAdmin.getZkUrl());
+ LOGGER.info("Updating Existing CRR Map {} with new CRR {}",
clusterRoleToCRRMap, crr);
+ clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>());
+ clusterRoleToCRRMap.get(role).put(crr.getHaGroupName(), crr);
+ LOGGER.info("Added new CRR {} to CRR Map", crr);
+ // Remove any pre-existing mapping with any other role for this
HAGroupName
+ for (ClusterRoleRecord.ClusterRole mapRole :
clusterRoleToCRRMap.keySet()) {
+ if (mapRole != role) {
+ ConcurrentHashMap<String, ClusterRoleRecord> roleWiseMap =
clusterRoleToCRRMap.get(mapRole);
+ if (roleWiseMap.containsKey(crr.getHaGroupName())) {
+ LOGGER.info("Removing any pre-existing mapping with
role {} for HAGroupName {}", mapRole, crr.getHaGroupName());
+ roleWiseMap.remove(crr.getHaGroupName());
+ }
+ }
+ }
+ LOGGER.info("Final Updated CRR Map {}", clusterRoleToCRRMap);
+ }
+ }
+
+ private void buildClusterRoleToCRRMap() {
+ List<ClusterRoleRecord> clusterRoleRecords =
pathChildrenCache.getCurrentData().stream().map(this::extractCRROrNull)
+ .filter(Objects::nonNull).collect(Collectors.toList());
+ for (ClusterRoleRecord crr : clusterRoleRecords) {
+ updateClusterRoleRecordMap(crr);
+ }
+ }
+
+ public void rebuild() throws Exception {
+ if (!isHealthy) {
+ throw new IOException("HAGroupStoreClient is not healthy");
+ }
+ LOGGER.info("Rebuilding HAGroupStoreClient for HA groups");
+ // NOTE: this is a BLOCKING method.
+ // Completely rebuild the internal cache by querying for all needed
data
+ // WITHOUT generating any events to send to listeners.
+ pathChildrenCache.rebuild();
+ buildClusterRoleToCRRMap();
+ LOGGER.info("Rebuild Complete for HAGroupStoreClient");
+ }
+
+
+ @Override
+ public void close() {
+ try {
+ LOGGER.info("Closing HAGroupStoreClient");
+ clusterRoleToCRRMap.clear();
+ if (pathChildrenCache != null) {
+ pathChildrenCache.close();
+ }
+ LOGGER.info("Closed HAGroupStoreClient");
+ } catch (IOException e) {
+ LOGGER.error("Exception closing HAGroupStoreClient", e);
+ }
+ }
+
+ public List<ClusterRoleRecord>
getCRRsByClusterRole(ClusterRoleRecord.ClusterRole clusterRole) throws
IOException {
+ if (!isHealthy) {
+ throw new IOException("HAGroupStoreClient is not healthy");
+ }
+ return clusterRoleToCRRMap.getOrDefault(clusterRole, new
ConcurrentHashMap<>()).values().stream().collect(ImmutableList.toImmutableList());
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
new file mode 100644
index 0000000000..690953a7b0
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+
+public class HAGroupStoreManager {
+ private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
+ private final boolean mutationBlockEnabled;
+ private final Configuration conf;
+
+ /**
+ * Creates/gets an instance of HAGroupStoreManager.
+ *
+ * @param conf configuration
+ * @return HAGroupStoreManager instance
+ */
+ public static HAGroupStoreManager getInstance(Configuration conf) {
+ if (haGroupStoreManagerInstance == null) {
+ synchronized (HAGroupStoreManager.class) {
+ if (haGroupStoreManagerInstance == null) {
+ haGroupStoreManagerInstance = new
HAGroupStoreManager(conf);
+ }
+ }
+ }
+ return haGroupStoreManagerInstance;
+ }
+
+ private HAGroupStoreManager(final Configuration conf) {
+ this.mutationBlockEnabled =
conf.getBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
+ DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED);
+ this.conf = conf;
+ }
+
+ /**
+ * Checks whether mutation is blocked or not.
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ */
+ public boolean isMutationBlocked() throws IOException {
+ if (mutationBlockEnabled) {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(conf);
+ if (haGroupStoreClient != null) {
+ return
!haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+ }
+ throw new IOException("HAGroupStoreClient is not initialized");
+ }
+ return false;
+ }
+
+ /**
+ * Force rebuilds the HAGroupStoreClient
+ * @throws Exception
+ */
+ public void invalidateHAGroupStoreClient() throws Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(conf);
+ if (haGroupStoreClient != null) {
+ haGroupStoreClient.rebuild();
+ } else {
+ throw new IOException("HAGroupStoreClient is not initialized");
+ }
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 9f27a0998d..938b60275b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -49,9 +49,7 @@ import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
index 7a82d47a93..ea026a6806 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
/**
* An HighAvailabilityGroup provides a JDBC connection from given connection
string and properties.
*/
-enum HighAvailabilityPolicy {
+public enum HighAvailabilityPolicy {
FAILOVER {
@Override
public Connection provide(HighAvailabilityGroup haGroup, Properties
info,
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
new file mode 100644
index 0000000000..c9019b6c1a
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Helper class to update cluster role record for a ZK cluster.
+ * The ZK client this accessor has is confined to a single ZK cluster, but it
can be used to operate
+ * multiple HA groups that are associated with this cluster.
+ * This is not thread-safe yet. Multiple threads can update CRRs at same time
potentially causing inconsistency.
+ */
+public class PhoenixHAAdmin implements Closeable {
+
+ /**
+ * Wrapper class for static accessor
+ */
+ public static class HighAvailibilityCuratorProvider {
+
+ public static final HighAvailibilityCuratorProvider INSTANCE = new
HighAvailibilityCuratorProvider();
+ /**
+ * Gets curator blocking if necessary to create it
+ */
+ public CuratorFramework getCurator(String zkUrl, Properties
properties) throws IOException {
+ return HighAvailabilityGroup.getCurator(zkUrl, properties);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PhoenixHAAdmin.class);
+
+ /** The fully qualified ZK URL for an HBase cluster in format
host:port:/hbase */
+ private final String zkUrl;
+ /** Configuration of this command line tool. */
+ private final Configuration conf;
+ /** Client properties which has copies of configuration defining ZK
timeouts / retries. */
+ private final Properties properties = new Properties();
+ /** Curator Provider **/
+ private final HighAvailibilityCuratorProvider
+ highAvailibilityCuratorProvider;
+
+ public PhoenixHAAdmin(Configuration conf) {
+ this(getLocalZkUrl(conf), conf,
HighAvailibilityCuratorProvider.INSTANCE);
+ }
+
+ public PhoenixHAAdmin(String zkUrl, Configuration conf) {
+ this(zkUrl, conf, HighAvailibilityCuratorProvider.INSTANCE);
+ }
+
+ public PhoenixHAAdmin(String zkUrl, Configuration conf,
+ HighAvailibilityCuratorProvider highAvailibilityCuratorProvider) {
+ Preconditions.checkNotNull(zkUrl);
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkNotNull(highAvailibilityCuratorProvider);
+ this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl);
+ this.conf = conf;
+ conf.iterator().forEachRemaining(k ->
properties.setProperty(k.getKey(), k.getValue()));
+ this.highAvailibilityCuratorProvider = highAvailibilityCuratorProvider;
+ }
+
+ /**
+ * Helper method to get local ZK fully qualified URL (host:port:/hbase)
from configuration.
+ */
+ public static String getLocalZkUrl(Configuration conf) {
+ String localZkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ if (StringUtils.isEmpty(localZkQuorum)) {
+ String msg = "ZK quorum not found by looking up key " +
HConstants.ZOOKEEPER_QUORUM;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+ if (portStr != null) {
+ try {
+ port = Integer.parseInt(portStr);
+ } catch (NumberFormatException e) {
+ String msg = String.format("Unrecognized ZK port '%s' in ZK
quorum '%s'",
+ portStr, localZkQuorum);
+ LOG.error(msg, e);
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
+
+ String localZkRoot = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+ return String.format("%s:%d:%s", localZkQuorum, port, localZkRoot);
+ }
+
+ /**
+ * Gets curator from the cache if available otherwise calls into
getCurator to make it.
+ */
+ public CuratorFramework getCurator() throws IOException {
+ return highAvailibilityCuratorProvider.getCurator(zkUrl, properties);
+ }
+
+
+ /**
+ * Check if current cluster is ACTIVE role for the given HA group.
+ * In case of Exception when it fails to read cluster role data from the
current cluster, it
+ * will assume current cluster is not ACTIVE. Callers should be aware of
"false positive"
+ * possibility especially due to connectivity issue between this tool and
remote ZK cluster.
+ * @param haGroupName the HA group name; a cluster can be associated with
multiple HA groups
+ * @return true if current cluster is ACTIVE role, otherwise false
+ */
+ boolean isCurrentActiveCluster(String haGroupName) {
+ try {
+ byte[] data = getCurator().getData().forPath(toPath(haGroupName));
+
+ Optional<ClusterRoleRecord> record =
ClusterRoleRecord.fromJson(data);
+ return record.isPresent() && record.get()
+ .getRole(zkUrl) == ClusterRoleRecord.ClusterRole.ACTIVE;
+ } catch (KeeperException.NoNodeException ne) {
+ LOG.info(
+ "No role record found for HA group {} on '{}', assuming it
is not active",
+ haGroupName, zkUrl);
+ return false;
+ } catch (Exception e) {
+ LOG.warn("Got exception when reading record for {} on cluster {}",
+ haGroupName, zkUrl, e);
+ return false;
+ }
+ }
+
+ /**
+ * This lists all cluster role records stored in the zookeeper nodes.
+ * This read-only operation and hence no side effect on the ZK cluster.
+ */
+ public List<ClusterRoleRecord> listAllClusterRoleRecordsOnZookeeper()
throws IOException {
+ List<String> haGroupNames;
+ try {
+ haGroupNames =
getCurator().getChildren().forPath(ZKPaths.PATH_SEPARATOR);
+ } catch (Exception e) {
+ String msg = String.format("Got exception when listing all HA
groups in %s", zkUrl);
+ LOG.error(msg);
+ throw new IOException(msg, e);
+ }
+
+ List<ClusterRoleRecord> records = new ArrayList<>();
+ List<String> failedHaGroups = new ArrayList<>();
+ for (String haGroupName : haGroupNames) {
+ try {
+ byte[] data =
getCurator().getData().forPath(ZKPaths.PATH_SEPARATOR + haGroupName);
+ Optional<ClusterRoleRecord> record =
ClusterRoleRecord.fromJson(data);
+ if (record.isPresent()) {
+ records.add(record.get());
+ } else { // fail to deserialize data from JSON
+ failedHaGroups.add(haGroupName);
+ }
+ } catch (Exception e) {
+ LOG.warn("Got exception when reading data for HA group {}",
+ haGroupName, e);
+ failedHaGroups.add(haGroupName);
+ }
+ }
+
+ if (!failedHaGroups.isEmpty()) {
+ String
+ msg =
+ String.format(
+ "Found following HA groups: %s. Fail to read
cluster " + "role records for following HA groups: %s",
+ String.join(",", haGroupNames), String.join(",",
failedHaGroups));
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ return records;
+ }
+
+ /**
+ * Helper method to write the given cluster role records into the ZK
clusters respectively.
+ *
+ * // TODO: add retry logics
+ *
+ * @param records The cluster role record list to save on ZK
+ * @param forceful if true, this method will ignore errors on other
clusters; otherwise it will
+ * not update next cluster (in order) if there is any
failure on current cluster
+ * @return a map of HA group name to list cluster's url for cluster role
record failing to write
+ */
+ public Map<String, List<String>>
syncClusterRoleRecords(List<ClusterRoleRecord> records,
+ boolean forceful) throws IOException {
+ Map<String, List<String>> failedHaGroups = new HashMap<>();
+ for (ClusterRoleRecord record : records) {
+ String haGroupName = record.getHaGroupName();
+ try (PhoenixHAAdmin admin1 = new PhoenixHAAdmin(record.getZk1(),
conf, HighAvailibilityCuratorProvider.INSTANCE);
+ PhoenixHAAdmin admin2 = new
PhoenixHAAdmin(record.getZk2(), conf,
HighAvailibilityCuratorProvider.INSTANCE)) {
+ // Update the cluster previously ACTIVE cluster first.
+ // It reduces the chances of split-brain between clients and
clusters.
+ // If can not determine previous ACTIVE cluster, update new
STANDBY cluster first.
+ final PairOfSameType<PhoenixHAAdmin> pair;
+ if (admin1.isCurrentActiveCluster(haGroupName)) {
+ pair = new PairOfSameType<>(admin1, admin2);
+ } else if (admin2.isCurrentActiveCluster(haGroupName)) {
+ pair = new PairOfSameType<>(admin2, admin1);
+ } else if (record.getRole(admin1.getZkUrl()) ==
ClusterRoleRecord.ClusterRole.STANDBY) {
+ pair = new PairOfSameType<>(admin1, admin2);
+ } else {
+ pair = new PairOfSameType<>(admin2, admin1);
+ }
+ try {
+ pair.getFirst().createOrUpdateDataOnZookeeper(record);
+ } catch (IOException e) {
+ LOG.error("Error to create or update data on Zookeeper,
cluster={}, record={}",
+ pair.getFirst(), record);
+ failedHaGroups.computeIfAbsent(haGroupName, (k) -> new
ArrayList<>())
+ .add(pair.getFirst().getZkUrl());
+ if (!forceful) {
+ LOG.error("-forceful option is not enabled by command
line options, "
+ + "skip writing record {} to ZK clusters",
record);
+ // skip writing this record to second ZK cluster, so
we should report that
+ failedHaGroups.computeIfAbsent(haGroupName, (k) -> new
ArrayList<>())
+ .add(pair.getSecond().getZkUrl());
+ continue; // do not update this record on second
cluster
+ }
+ }
+ try {
+ pair.getSecond().createOrUpdateDataOnZookeeper(record);
+ } catch (IOException e) {
+ LOG.error("Error to create or update data on Zookeeper,
cluster={}, record={}",
+ pair.getFirst(), record);
+ failedHaGroups.computeIfAbsent(haGroupName, (k) -> new
ArrayList<>())
+ .add(pair.getSecond().getZkUrl());
+ }
+ }
+ }
+ return failedHaGroups;
+ }
+
+ /**
+ * Verify cluster role records stored in local ZK nodes, and repair with
remote znodes for any
+ * inconsistency.
+ * @return a list of HA group names with inconsistent cluster role
records, or empty list
+ */
+ List<String> verifyAndRepairWithRemoteZnode() throws Exception {
+ List<String> inconsistentHaGroups = new ArrayList<>();
+ for (ClusterRoleRecord record :
listAllClusterRoleRecordsOnZookeeper()) {
+ // the remote znodes may be on different ZK clusters.
+ if (record.getRole(zkUrl) ==
ClusterRoleRecord.ClusterRole.UNKNOWN) {
+ LOG.warn("Unknown cluster role for cluster '{}' in record {}",
+ zkUrl, record);
+ continue;
+ }
+ String remoteZkUrl = record.getZk1().equals(zkUrl) ?
record.getZk2() : record.getZk1();
+ try (PhoenixHAAdmin remoteAdmin = new PhoenixHAAdmin(remoteZkUrl,
conf,
+ HighAvailibilityCuratorProvider.INSTANCE)) {
+ ClusterRoleRecord remoteRecord;
+ try {
+ String zPath = toPath(record.getHaGroupName());
+ byte[] data =
remoteAdmin.getCurator().getData().forPath(zPath);
+ Optional<ClusterRoleRecord> recordOptional =
ClusterRoleRecord.fromJson(data);
+ if (!recordOptional.isPresent()) {
+ remoteAdmin.createOrUpdateDataOnZookeeper(record);
+ continue;
+ }
+ remoteRecord = recordOptional.get();
+ } catch (KeeperException.NoNodeException ne) {
+ LOG.warn(
+ "No record znode yet, creating for HA group {} on
{}",
+ record.getHaGroupName(), remoteAdmin);
+ remoteAdmin.createDataOnZookeeper(record);
+ LOG.info("Created znode on cluster {} with record {}",
+ remoteAdmin, record);
+ continue;
+ } catch (Exception e) {
+ LOG.error(
+ "Error to get data on remote cluster {} for HA
group {}", remoteAdmin,
+ record.getHaGroupName(), e);
+ continue;
+ }
+
+ if
(!record.getHaGroupName().equals(remoteRecord.getHaGroupName())) {
+ inconsistentHaGroups.add(record.getHaGroupName());
+ LOG.error(
+ "INTERNAL ERROR: got cluster role record for
different HA groups." + " Local record: {}, remote record: {}",
+ record, remoteRecord);
+ } else if (remoteRecord.isNewerThan(record)) {
+ createOrUpdateDataOnZookeeper(remoteRecord);
+ } else if (record.isNewerThan(remoteRecord)) {
+ remoteAdmin.createOrUpdateDataOnZookeeper(record);
+ } else if (record.equals(remoteRecord)) {
+ LOG.info("Cluster role record {} is consistent", record);
+ } else {
+ inconsistentHaGroups.add(record.getHaGroupName());
+ LOG.error(
+ "Cluster role record for HA group {} is
inconsistent. On cluster " + "{} the record is {}; on cluster {} the record is
{}",
+ record.getHaGroupName(), this, record,
remoteAdmin, remoteRecord);
+ }
+ }
+ }
+ return inconsistentHaGroups;
+ }
+
+ /**
+ * This updates the cluster role data on the zookeeper it connects to.
+ * To avoid conflicts, it does CAS (compare-and-set) when updating. The
constraint is that the
+ * given record's version should be larger the existing record's version.
This is a way to help
+ * avoiding manual update conflicts. If the given record can not meet
version check, it will
+ * reject the update request and client (human operator or external
system) should retry.
+ * @param record the new cluster role record to be saved on ZK
+ * @return true if the data on ZK is updated otherwise false
+ * @throws IOException if it fails to update the cluster role data on ZK
+ */
+ public boolean createOrUpdateDataOnZookeeper(ClusterRoleRecord record)
throws IOException {
+ if (!zkUrl.equals(record.getZk1()) && !zkUrl.equals(record.getZk2())) {
+ String
+ msg =
+ String.format(
+ "INTERNAL ERROR: " + "ZK cluster is not associated
with cluster role record! " + "ZK cluster URL: '%s'. Cluster role record: %s",
+ zkUrl, record);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ String haGroupName = record.getHaGroupName();
+ byte[] data;
+ try {
+ data = getCurator().getData().forPath(toPath(haGroupName)); // Get
initial data
+ } catch (KeeperException.NoNodeException ne) {
+ LOG.info("No record znode yet, creating for HA group {} on {}",
+ haGroupName, zkUrl);
+ createDataOnZookeeper(record);
+ LOG.info("Created znode for HA group {} with record data {} on {}",
+ haGroupName, record, zkUrl);
+ return true;
+ } catch (Exception e) {
+ String
+ msg =
+ String.format(
+ "Fail to read cluster role record data for HA
group %s " + "on cluster '%s'",
+ haGroupName, zkUrl);
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ Optional<ClusterRoleRecord> existingRecordOptional =
ClusterRoleRecord.fromJson(data);
+ if (!existingRecordOptional.isPresent()) {
+ String
+ msg =
+ String.format(
+ "Fail to parse existing cluster role record data
for HA " + "group %s",
+ haGroupName);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ ClusterRoleRecord existingRecord = existingRecordOptional.get();
+ if (record.getVersion() < existingRecord.getVersion()) {
+ String
+ msg =
+ String.format(
+ "Invalid new cluster role record for HA group '%s'
" + "because new record's version V%d is smaller than existing V%d. " +
"Existing role record: %s. New role record fail to save: %s",
+ haGroupName, record.getVersion(),
existingRecord.getVersion(),
+ existingRecord, record);
+ LOG.warn(msg);
+ return false; // return instead of error out to tolerate
+ }
+
+ if (record.getVersion() == existingRecord.getVersion()) {
+ if (record.equals(existingRecord)) {
+ LOG.debug(
+ "Cluster role does not change since last update on
ZK.");
+ return false; // no need to update iff they are the same.
+ } else {
+ String
+ msg =
+ String.format(
+ "Invalid new cluster role record for HA group
'%s' " + "because it has the same version V%d but inconsistent data. " +
"Existing role record: %s. New role record fail to save: %s",
+ haGroupName, record.getVersion(),
existingRecord, record);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ return updateDataOnZookeeper(existingRecord, record);
+ }
+
+ /**
+ * Helper to create the znode on the ZK cluster.
+ */
+ private void createDataOnZookeeper(ClusterRoleRecord record) throws
IOException {
+ String haGroupName = record.getHaGroupName();
+ // znode path for given haGroup name assuming namespace (prefix) has
been set.
+ String haGroupPath = toPath(haGroupName);
+ try {
+
getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .forPath(haGroupPath, ClusterRoleRecord.toJson(record));
+ } catch (KeeperException.NodeExistsException nee) {
+ //this method assumes that the znode doesn't exist yet, but it
could have been
+ //created between now and the last time we checked. We swallow the
exception and
+ //rely on our caller to check to make sure the znode that's saved
is correct
+ LOG.warn("Znode for HA group {} already exists. ", haGroupPath,
nee);
+ } catch (Exception e) {
+ LOG.error(
+ "Fail to initialize the znode for HA group {} with record
data {}", haGroupPath,
+ record, e);
+ throw new IOException("Fail to initialize znode for HA group " +
haGroupPath, e);
+ }
+ }
+
+ /**
+ * Helper to update the znode on ZK cluster assuming current data is the
given old record.
+ */
+ private boolean updateDataOnZookeeper(ClusterRoleRecord oldRecord,
ClusterRoleRecord newRecord)
+ throws IOException {
+ // znode path for given haGroup name assuming namespace (prefix) has
been set.
+ String haGroupPath = toPath(newRecord.getHaGroupName());
+ RetryPolicy retryPolicy =
HighAvailabilityGroup.createRetryPolicy(properties);
+ try {
+ DistributedAtomicValue
+ v =
+ new DistributedAtomicValue(getCurator(), haGroupPath,
retryPolicy);
+ AtomicValue<byte[]>
+ result =
+ v.compareAndSet(ClusterRoleRecord.toJson(oldRecord),
+ ClusterRoleRecord.toJson(newRecord));
+ LOG.info(
+ "Updated cluster role record ({}->{}) for HA group {} on
cluster '{}': {}",
+ oldRecord.getVersion(), newRecord.getVersion(),
newRecord.getHaGroupName(),
+ zkUrl, result.succeeded() ? "succeeded" : "failed");
+ LOG.debug(
+ "Old DistributedAtomicValue: {}, New
DistributedAtomicValue: {},",
+ new String(result.preValue(), StandardCharsets.UTF_8),
+ new String(result.postValue(), StandardCharsets.UTF_8));
+ return result.succeeded();
+ } catch (Exception e) {
+ String
+ msg =
+ String.format(
+ "Fail to update cluster role record to ZK for the
HA " + "group %s due to '%s'." + "Existing role record: %s. New role record
fail to save: %s",
+ haGroupPath, e.getMessage(), oldRecord, newRecord);
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+
+ /**
+ * Helper method to get ZK path for an HA group given the HA group name.
+ * It assumes the ZK namespace (prefix) has been set.
+ */
+ public static String toPath(String haGroupName) {
+ return ZKPaths.PATH_SEPARATOR + haGroupName;
+ }
+
+ public String getZkUrl() {
+ return zkUrl;
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("PhoenixHAAdmin for {} is now closed.", zkUrl);
+ }
+
+ @Override
+ public String toString() {
+ return zkUrl;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
index 5da3ea1593..a59dc9fd8d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
@@ -17,18 +17,11 @@
*/
package org.apache.phoenix.jdbc;
-import java.io.Closeable;
import java.io.FileReader;
-import java.io.IOException;
import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
@@ -38,25 +31,14 @@ import
org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.atomic.AtomicValue;
-import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
-import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
-import org.apache.phoenix.util.JDBCUtil;
+import org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
import org.apache.phoenix.util.JacksonUtil;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -109,9 +91,10 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
if (commandLine.hasOption(HELP_OPT.getOpt())) {
printUsageMessage();
return RET_SUCCESS;
- } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
- String zkUrl = getLocalZkUrl(getConf()); // Admin is created
against local ZK cluster
- try (PhoenixHAAdminHelper admin = new
PhoenixHAAdminHelper(zkUrl, getConf(),
HighAvailibilityCuratorProvider.INSTANCE)) {
+ }
+ String zkUrl = PhoenixHAAdmin.getLocalZkUrl(getConf()); // Admin
is created against local ZK cluster
+ if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
+ try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl,
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
List<ClusterRoleRecord> records =
admin.listAllClusterRoleRecordsOnZookeeper();
JacksonUtil.getObjectWriterPretty().writeValue(System.out,
records);
}
@@ -119,16 +102,17 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
String fileName =
commandLine.getOptionValue(MANIFEST_OPT.getOpt());
List<ClusterRoleRecord> records =
readRecordsFromFile(fileName);
boolean forceful =
commandLine.hasOption(FORCEFUL_OPT.getOpt());
- Map<String, List<String>> failedHaGroups =
syncClusterRoleRecords(records, forceful);
- if (!failedHaGroups.isEmpty()) {
- System.out.println("Found following HA groups are failing
to write the clusters:");
- failedHaGroups.forEach((k, v) ->
- System.out.printf("%s -> [%s]\n", k,
String.join(",", v)));
- return RET_SYNC_ERROR;
+ try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl,
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
+ Map<String, List<String>> failedHaGroups =
admin.syncClusterRoleRecords(records, forceful);
+ if (!failedHaGroups.isEmpty()) {
+ System.out.println("Found following HA groups are
failing to write the clusters:");
+ failedHaGroups.forEach((k, v) ->
+ System.out.printf("%s -> [%s]\n", k,
String.join(",", v)));
+ return RET_SYNC_ERROR;
+ }
}
} else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { //
verify and repair
- String zkUrl = getLocalZkUrl(getConf()); // Admin is created
against local ZK cluster
- try (PhoenixHAAdminHelper admin = new
PhoenixHAAdminHelper(zkUrl, getConf(),
HighAvailibilityCuratorProvider.INSTANCE)) {
+ try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl,
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
List<String> inconsistentRecord =
admin.verifyAndRepairWithRemoteZnode();
if (!inconsistentRecord.isEmpty()) {
System.out.println("Found following inconsistent
cluster role records: ");
@@ -171,64 +155,6 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
}
}
- /**
- * Helper method to write the given cluster role records into the ZK
clusters respectively.
- *
- * // TODO: add retry logics
- *
- * @param records The cluster role record list to save on ZK
- * @param forceful if true, this method will ignore errors on other
clusters; otherwise it will
- * not update next cluster (in order) if there is any
failure on current cluster
- * @return a map of HA group name to list cluster's url for cluster role
record failing to write
- */
- private Map<String, List<String>>
syncClusterRoleRecords(List<ClusterRoleRecord> records,
- boolean forceful) throws IOException {
- Map<String, List<String>> failedHaGroups = new HashMap<>();
- for (ClusterRoleRecord record : records) {
- String haGroupName = record.getHaGroupName();
- try (PhoenixHAAdminHelper admin1 = new
PhoenixHAAdminHelper(record.getZk1(), getConf(),
HighAvailibilityCuratorProvider.INSTANCE);
- PhoenixHAAdminHelper admin2 = new
PhoenixHAAdminHelper(record.getZk2(), getConf(),
HighAvailibilityCuratorProvider.INSTANCE)) {
- // Update the cluster previously ACTIVE cluster first.
- // It reduces the chances of split-brain between clients and
clusters.
- // If can not determine previous ACTIVE cluster, update new
STANDBY cluster first.
- final PairOfSameType<PhoenixHAAdminHelper> pair;
- if (admin1.isCurrentActiveCluster(haGroupName)) {
- pair = new PairOfSameType<>(admin1, admin2);
- } else if (admin2.isCurrentActiveCluster(haGroupName)) {
- pair = new PairOfSameType<>(admin2, admin1);
- } else if (record.getRole(admin1.getZkUrl()) ==
ClusterRole.STANDBY) {
- pair = new PairOfSameType<>(admin1, admin2);
- } else {
- pair = new PairOfSameType<>(admin2, admin1);
- }
- try {
- pair.getFirst().createOrUpdateDataOnZookeeper(record);
- } catch (IOException e) {
- LOG.error("Error to create or update data on Zookeeper,
cluster={}, record={}",
- pair.getFirst(), record);
- failedHaGroups.computeIfAbsent(haGroupName, (k) -> new
ArrayList<>())
- .add(pair.getFirst().zkUrl);
- if (!forceful) {
- LOG.error("-forceful option is not enabled by command
line options, "
- + "skip writing record {} to ZK clusters",
record);
- // skip writing this record to second ZK cluster, so
we should report that
- failedHaGroups.computeIfAbsent(haGroupName, (k) -> new
ArrayList<>())
- .add(pair.getSecond().zkUrl);
- continue; // do not update this record on second
cluster
- }
- }
- try {
- pair.getSecond().createOrUpdateDataOnZookeeper(record);
- } catch (IOException e) {
- LOG.error("Error to create or update data on Zookeeper,
cluster={}, record={}",
- pair.getFirst(), record);
- failedHaGroups.computeIfAbsent(haGroupName, (k) -> new
ArrayList<>())
- .add(pair.getSecond().zkUrl);
- }
- }
- }
- return failedHaGroups;
- }
/**
* Parses the commandline arguments, throw exception if validation fails.
@@ -268,370 +194,6 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
formatter.printHelp("help", OPTIONS);
}
- /**
- * Helper method to get local ZK fully qualified URL (host:port:/hbase)
from configuration.
- */
- public static String getLocalZkUrl(Configuration conf) {
- String localZkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
- if (StringUtils.isEmpty(localZkQuorum)) {
- String msg = "ZK quorum not found by looking up key " +
HConstants.ZOOKEEPER_QUORUM;
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
-
- String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
- if (portStr != null) {
- try {
- port = Integer.parseInt(portStr);
- } catch (NumberFormatException e) {
- String msg = String.format("Unrecognized ZK port '%s' in ZK
quorum '%s'",
- portStr, localZkQuorum);
- LOG.error(msg, e);
- throw new IllegalArgumentException(msg, e);
- }
- }
-
- String localZkRoot = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
- HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
-
- return String.format("%s:%d:%s", localZkQuorum, port, localZkRoot);
- }
-
- /**
- * Wrapper class for static accessor
- */
- @VisibleForTesting
- static class HighAvailibilityCuratorProvider {
-
- public static final HighAvailibilityCuratorProvider INSTANCE = new
HighAvailibilityCuratorProvider();
-
- /**
- * Gets curator blocking if necessary to create it
- */
- public CuratorFramework getCurator(String zkUrl, Properties
properties) throws IOException {
- return HighAvailabilityGroup.getCurator(zkUrl, properties);
- }
- }
-
- /**
- * Helper class to update cluster role record for a ZK cluster.
- *
- * The ZK client this accessor has is confined to a single ZK cluster, but
it can be used to
- * operate multiple HA groups that are associated with this cluster.
- */
- @VisibleForTesting
- static class PhoenixHAAdminHelper implements Closeable {
- /** The fully qualified ZK URL for an HBase cluster in format
host:port:/hbase */
- private final String zkUrl;
- /** Configuration of this command line tool. */
- private final Configuration conf;
- /** Client properties which has copies of configuration defining ZK
timeouts / retries. */
- private final Properties properties = new Properties();
- /** Curator Provider **/
- private final HighAvailibilityCuratorProvider
highAvailibilityCuratorProvider;
-
- PhoenixHAAdminHelper(String zkUrl, Configuration conf,
HighAvailibilityCuratorProvider highAvailibilityCuratorProvider) {
- Preconditions.checkNotNull(zkUrl);
- Preconditions.checkNotNull(conf);
- Preconditions.checkNotNull(highAvailibilityCuratorProvider);
- this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl);
- this.conf = conf;
- conf.iterator().forEachRemaining(k ->
properties.setProperty(k.getKey(), k.getValue()));
- this.highAvailibilityCuratorProvider =
highAvailibilityCuratorProvider;
- }
-
- /**
- * Gets curator from the cache if available otherwise calls into
getCurator to make it.
- */
- private CuratorFramework getCurator() throws IOException {
- return highAvailibilityCuratorProvider.getCurator(zkUrl,
properties);
- }
-
- /**
- * Check if current cluster is ACTIVE role for the given HA group.
- *
- * In case of Exception when it fails to read cluster role data from
the current cluster, it
- * will assume current cluster is not ACTIVE. Callers should be aware
of "false positive"
- * possibility especially due to connectivity issue between this tool
and remote ZK cluster.
- *
- * @param haGroupName the HA group name; a cluster can be associated
with multiple HA groups
- * @return true if current cluster is ACTIVE role, otherwise false
- */
- private boolean isCurrentActiveCluster(String haGroupName) {
- try {
- byte[] data =
getCurator().getData().forPath(toPath(haGroupName));
-
- Optional<ClusterRoleRecord> record =
ClusterRoleRecord.fromJson(data);
- return record.isPresent() && record.get().getRole(zkUrl) ==
ClusterRole.ACTIVE;
- } catch (NoNodeException ne) {
- LOG.info("No role record found for HA group {} on '{}',
assuming it is not active",
- haGroupName, zkUrl);
- return false;
- } catch (Exception e) {
- LOG.warn("Got exception when reading record for {} on cluster
{}",
- haGroupName, zkUrl, e);
- return false;
- }
- }
-
- /**
- * This lists all cluster role records stored in the zookeeper nodes.
- *
- * This read-only operation and hence no side effect on the ZK cluster.
- */
- List<ClusterRoleRecord> listAllClusterRoleRecordsOnZookeeper() throws
IOException {
- List<String> haGroupNames;
- try {
- haGroupNames =
getCurator().getChildren().forPath(ZKPaths.PATH_SEPARATOR);
- } catch (Exception e) {
- String msg = String.format("Got exception when listing all HA
groups in %s", zkUrl);
- LOG.error(msg);
- throw new IOException(msg, e);
- }
-
- List<ClusterRoleRecord> records = new ArrayList<>();
- List<String> failedHaGroups = new ArrayList<>();
- for (String haGroupName : haGroupNames) {
- try {
- byte[] data =
getCurator().getData().forPath(ZKPaths.PATH_SEPARATOR + haGroupName);
- Optional<ClusterRoleRecord> record =
ClusterRoleRecord.fromJson(data);
- if (record.isPresent()) {
- records.add(record.get());
- } else { // fail to deserialize data from JSON
- failedHaGroups.add(haGroupName);
- }
- } catch (Exception e) {
- LOG.warn("Got exception when reading data for HA group
{}", haGroupName, e);
- failedHaGroups.add(haGroupName);
- }
- }
-
- if (!failedHaGroups.isEmpty()) {
- String msg = String.format("Found following HA groups: %s.
Fail to read cluster "
- + "role records for following HA groups: %s",
- String.join(",", haGroupNames), String.join(",",
failedHaGroups));
- LOG.error(msg);
- throw new IOException(msg);
- }
- return records;
- }
-
- /**
- * Verify cluster role records stored in local ZK nodes, and repair
with remote znodes for
- * any inconsistency.
- *
- * @return a list of HA group names with inconsistent cluster role
records, or empty list
- */
- List<String> verifyAndRepairWithRemoteZnode() throws Exception {
- List<String> inconsistentHaGroups = new ArrayList<>();
- for (ClusterRoleRecord record :
listAllClusterRoleRecordsOnZookeeper()) {
- // the remote znodes may be on different ZK clusters.
- if (record.getRole(zkUrl) == ClusterRole.UNKNOWN) {
- LOG.warn("Unknown cluster role for cluster '{}' in record
{}", zkUrl, record);
- continue;
- }
- String remoteZkUrl = record.getZk1().equals(zkUrl)
- ? record.getZk2()
- : record.getZk1();
- try (PhoenixHAAdminHelper remoteAdmin = new
PhoenixHAAdminHelper(remoteZkUrl, conf,
HighAvailibilityCuratorProvider.INSTANCE)) {
- ClusterRoleRecord remoteRecord;
- try {
- String zPath = toPath(record.getHaGroupName());
- byte[] data =
remoteAdmin.getCurator().getData().forPath(zPath);
- Optional<ClusterRoleRecord> recordOptional =
ClusterRoleRecord.fromJson(data);
- if (!recordOptional.isPresent()) {
- remoteAdmin.createOrUpdateDataOnZookeeper(record);
- continue;
- }
- remoteRecord = recordOptional.get();
- } catch (NoNodeException ne) {
- LOG.warn("No record znode yet, creating for HA group
{} on {}",
- record.getHaGroupName(), remoteAdmin);
- remoteAdmin.createDataOnZookeeper(record);
- LOG.info("Created znode on cluster {} with record {}",
remoteAdmin, record);
- continue;
- } catch (Exception e) {
- LOG.error("Error to get data on remote cluster {} for
HA group {}",
- remoteAdmin, record.getHaGroupName(), e);
- continue;
- }
-
- if
(!record.getHaGroupName().equals(remoteRecord.getHaGroupName())) {
- inconsistentHaGroups.add(record.getHaGroupName());
- LOG.error("INTERNAL ERROR: got cluster role record for
different HA groups."
- + " Local record: {}, remote record: {}",
record, remoteRecord);
- } else if (remoteRecord.isNewerThan(record)) {
- createOrUpdateDataOnZookeeper(remoteRecord);
- } else if (record.isNewerThan(remoteRecord)) {
- remoteAdmin.createOrUpdateDataOnZookeeper(record);
- } else if (record.equals(remoteRecord)) {
- LOG.info("Cluster role record {} is consistent",
record);
- } else {
- inconsistentHaGroups.add(record.getHaGroupName());
- LOG.error("Cluster role record for HA group {} is
inconsistent. On cluster "
- + "{} the record is {}; on cluster {}
the record is {}",
- record.getHaGroupName(), this, record,
remoteAdmin, remoteRecord);
- }
- }
- }
- return inconsistentHaGroups;
- }
-
- /**
- * This updates the cluster role data on the zookeeper it connects to.
- *
- * To avoid conflicts, it does CAS (compare-and-set) when updating.
The constraint is that
- * the given record's version should be larger the existing record's
version. This is a way
- * to help avoiding manual update conflicts. If the given record can
not meet version
- * check, it will reject the update request and client (human operator
or external system)
- * should retry.
- *
- * @param record the new cluster role record to be saved on ZK
- * @throws IOException if it fails to update the cluster role data on
ZK
- * @return true if the data on ZK is updated otherwise false
- */
- boolean createOrUpdateDataOnZookeeper(ClusterRoleRecord record) throws
IOException {
- if (!zkUrl.equals(record.getZk1()) &&
!zkUrl.equals(record.getZk2())) {
- String msg = String.format("INTERNAL ERROR: "
- + "ZK cluster is not associated with cluster
role record! "
- + "ZK cluster URL: '%s'. Cluster role record:
%s",
- zkUrl, record);
- LOG.error(msg);
- throw new IOException(msg);
- }
-
- String haGroupName = record.getHaGroupName();
- byte[] data;
- try {
- data = getCurator().getData().forPath(toPath(haGroupName)); //
Get initial data
- } catch (NoNodeException ne) {
- LOG.info("No record znode yet, creating for HA group {} on
{}", haGroupName, zkUrl);
- createDataOnZookeeper(record);
- LOG.info("Created znode for HA group {} with record data {} on
{}", haGroupName,
- record, zkUrl);
- return true;
- } catch (Exception e) {
- String msg = String.format("Fail to read cluster role record
data for HA group %s "
- + "on cluster '%s'", haGroupName, zkUrl);
- LOG.error(msg, e);
- throw new IOException(msg, e);
- }
-
- Optional<ClusterRoleRecord> existingRecordOptional =
ClusterRoleRecord.fromJson(data);
- if (!existingRecordOptional.isPresent()) {
- String msg = String.format("Fail to parse existing cluster
role record data for HA "
- + "group %s", haGroupName);
- LOG.error(msg);
- throw new IOException(msg);
- }
-
- ClusterRoleRecord existingRecord = existingRecordOptional.get();
- if (record.getVersion() < existingRecord.getVersion()) {
- String msg = String.format("Invalid new cluster role record
for HA group '%s' "
- + "because new record's version V%d is smaller
than existing V%d. "
- + "Existing role record: %s. New role record
fail to save: %s",
- haGroupName, record.getVersion(),
existingRecord.getVersion(),
- existingRecord, record);
- LOG.warn(msg);
- return false; // return instead of error out to tolerate
- }
-
- if (record.getVersion() == existingRecord.getVersion()) {
- if (record.equals(existingRecord)) {
- LOG.debug("Cluster role does not change since last update
on ZK.");
- return false; // no need to update iff they are the same.
- } else {
- String msg = String.format("Invalid new cluster role
record for HA group '%s' "
- + "because it has the same version V%d but
inconsistent data. "
- + "Existing role record: %s. New role
record fail to save: %s",
- haGroupName, record.getVersion(), existingRecord,
record);
- LOG.error(msg);
- throw new IOException(msg);
- }
- }
-
- return updateDataOnZookeeper(existingRecord, record);
- }
-
- /**
- * Helper to create the znode on the ZK cluster.
- */
- private void createDataOnZookeeper(ClusterRoleRecord record) throws
IOException {
- String haGroupName = record.getHaGroupName();
- // znode path for given haGroup name assuming namespace (prefix)
has been set.
- String haGroupPath = toPath(haGroupName);
- try {
- getCurator().create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(haGroupPath,
ClusterRoleRecord.toJson(record));
- } catch (NodeExistsException nee) {
- //this method assumes that the znode doesn't exist yet, but it
could have been
- //created between now and the last time we checked. We swallow
the exception and
- //rely on our caller to check to make sure the znode that's
saved is correct
- LOG.warn("Znode for HA group {} already exists. ",
- haGroupPath, nee);
- } catch (Exception e) {
- LOG.error("Fail to initialize the znode for HA group {} with
record data {}",
- haGroupPath, record, e);
- throw new IOException("Fail to initialize znode for HA group "
+ haGroupPath, e);
- }
- }
-
- /**
- * Helper to update the znode on ZK cluster assuming current data is
the given old record.
- */
- private boolean updateDataOnZookeeper(ClusterRoleRecord oldRecord,
- ClusterRoleRecord newRecord) throws IOException {
- // znode path for given haGroup name assuming namespace (prefix)
has been set.
- String haGroupPath = toPath(newRecord.getHaGroupName());
- RetryPolicy retryPolicy =
HighAvailabilityGroup.createRetryPolicy(properties);
- try {
- DistributedAtomicValue v = new
DistributedAtomicValue(getCurator(), haGroupPath, retryPolicy);
- AtomicValue<byte[]> result = v.compareAndSet(
- ClusterRoleRecord.toJson(oldRecord),
ClusterRoleRecord.toJson(newRecord));
- LOG.info("Updated cluster role record ({}->{}) for HA group {}
on cluster '{}': {}",
- oldRecord.getVersion(), newRecord.getVersion(),
newRecord.getHaGroupName(),
- zkUrl, result.succeeded() ? "succeeded" : "failed");
- LOG.debug("Old DistributedAtomicValue: {}, New
DistributedAtomicValue: {},",
- new String(result.preValue(), StandardCharsets.UTF_8),
- new String(result.postValue(),
StandardCharsets.UTF_8));
- return result.succeeded();
- } catch (Exception e) {
- String msg = String.format("Fail to update cluster role record
to ZK for the HA "
- + "group %s due to '%s'."
- + "Existing role record: %s. New role record
fail to save: %s",
- haGroupPath, e.getMessage(), oldRecord, newRecord);
- LOG.error(msg, e);
- throw new IOException(msg, e);
- }
- }
-
- /**
- * Helper method to get ZK path for an HA group given the HA group
name.
- *
- * It assumes the ZK namespace (prefix) has been set.
- */
- private static String toPath(String haGroupName) {
- return ZKPaths.PATH_SEPARATOR + haGroupName;
- }
-
- String getZkUrl() {
- return zkUrl;
- }
-
- @Override
- public void close() {
- LOG.debug("PhoenixHAAdmin for {} is now closed.", zkUrl);
- }
-
- @Override
- public String toString() {
- return zkUrl;
- }
- }
-
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int retCode = ToolRunner.run(conf, new PhoenixHAAdminTool(), args);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 80e8bca6b1..be3801d118 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -382,6 +382,8 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_VIEW_TTL_ENABLED =
"phoenix.view.ttl.enabled";
public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT =
"phoenix.view.ttl.tenant_views_per_scan.limit";
+ // Block mutations based on cluster role record
+ public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED =
"phoenix.cluster.role.based.mutation.block.enabled";
// Before 4.15 when we created a view we included the parent table column
metadata in the view
// metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no
longer store the parent
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index c4f3812b51..9755572566 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -26,6 +26,7 @@ import static
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_
import static
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
import static
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
import static
org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED;
@@ -450,6 +451,8 @@ public class QueryServicesOptions {
public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT
= 300000; // 5 minutes
+ public static final Boolean
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
+
private final Configuration config;
@@ -553,7 +556,8 @@ public class QueryServicesOptions {
DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX)
.setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE,
DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
.setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED,
DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED)
- .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL,
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS);
+ .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL,
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS)
+ .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user
set
diff --git
a/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
b/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
index 2d0da268ba..725f3fc763 100644
--- a/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
+++ b/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
@@ -50,10 +50,19 @@ message InvalidateServerMetadataCacheRequest {
repeated InvalidateServerMetadataCache invalidateServerMetadataCacheRequests
= 1;
}
+message InvalidateHAGroupStoreClientRequest {
+}
+
+message InvalidateHAGroupStoreClientResponse {
+}
+
service RegionServerEndpointService {
rpc validateLastDDLTimestamp(ValidateLastDDLTimestampRequest)
returns (ValidateLastDDLTimestampResponse);
rpc invalidateServerMetadataCache(InvalidateServerMetadataCacheRequest)
returns (InvalidateServerMetadataCacheResponse);
+
+ rpc invalidateHAGroupStoreClient(InvalidateHAGroupStoreClientRequest)
+ returns (InvalidateHAGroupStoreClientResponse);
}
\ No newline at end of file
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 59fd1209db..22fa84c4a6 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.cache.ServerMetadataCacheImpl;
import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
import
org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -106,6 +107,24 @@ public class PhoenixRegionServerEndpoint
}
}
+ @Override
+ public void invalidateHAGroupStoreClient(RpcController controller,
+ RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest
request,
+
RpcCallback<RegionServerEndpointProtos.InvalidateHAGroupStoreClientResponse>
done) {
+ LOGGER.info("PhoenixRegionServerEndpoint invalidating
HAGroupStoreClient");
+ HAGroupStoreManager haGroupStoreManager;
+ try {
+ haGroupStoreManager = HAGroupStoreManager.getInstance(conf);
+ haGroupStoreManager.invalidateHAGroupStoreClient();
+ } catch (Throwable t) {
+ String errorMsg = "Invalidating HAGroupStoreClient FAILED, check
exception for "
+ + "specific details";
+ LOGGER.error(errorMsg, t);
+ IOException ioe = ClientUtil.createIOException(errorMsg, t);
+ ProtobufUtil.setControllerException(controller, ioe);
+ }
+ }
+
@Override
public Iterable<Service> getServices() {
return Collections.singletonList(this);
@@ -114,4 +133,5 @@ public class PhoenixRegionServerEndpoint
public ServerMetadataCache getServerMetadataCache() {
return ServerMetadataCacheImpl.getInstance(conf);
}
+
}
\ No newline at end of file
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 74c965345a..915db903b0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -88,6 +89,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper;
import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper.MutateCommand;
import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
@@ -1055,4 +1057,20 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
protected boolean isRegionObserverFor(Scan scan) {
return
scan.getAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG) != null;
}
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation>
miniBatchOp)
+ throws IOException {
+ final Configuration conf = c.getEnvironment().getConfiguration();
+ try {
+ final HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(conf);
+ if (haGroupStoreManager.isMutationBlocked()) {
+ throw new IOException("Blocking Mutation as Some CRRs are in
ACTIVE_TO_STANDBY "
+ + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED
is true");
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1223c3a980..72ef706566 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.CaseExpression;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
@@ -160,6 +161,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static final OperationStatus NOWRITE = new
OperationStatus(SUCCESS);
public static final String PHOENIX_APPEND_METADATA_TO_WAL =
"phoenix.append.metadata.to.wal";
public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+
/**
* Class to represent pending data table rows
* */
@@ -512,6 +514,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ /*
+ * Also checks for mutationBlockEnabled if
CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is enabled.
+ */
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -519,6 +524,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return;
}
try {
+ final Configuration conf = c.getEnvironment().getConfiguration();
+ final HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(conf);
+ if (haGroupStoreManager.isMutationBlocked()) {
+ throw new IOException("Blocking Mutation as Some CRRs are in
ACTIVE_TO_STANDBY "
+ + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED
is true");
+ }
preBatchMutateWithExceptions(c, miniBatchOp);
return;
} catch (Throwable t) {
@@ -1303,6 +1314,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return ts;
}
}
+
public void
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c,
miniBatchOp);
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 691731627d..4987d477c1 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -364,6 +364,10 @@
<artifactId>curator-framework</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </dependency>
<!-- Other test dependencies -->
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
index fa051203ca..f92db070e2 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
@@ -39,7 +39,6 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -73,7 +72,6 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static
org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
import static
org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
index 8996939290..670aba7f15 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
@@ -32,7 +32,6 @@ import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.ServerUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -158,6 +157,18 @@ public class PhoenixRegionServerEndpointIT extends
BaseTest {
}
}
+ @Test
+ public void testInvalidateHAGroupStoreClient() {
+ HRegionServer regionServer =
utility.getMiniHBaseCluster().getRegionServer(0);
+ PhoenixRegionServerEndpoint coprocessor =
getPhoenixRegionServerEndpoint(regionServer);
+ assertNotNull(coprocessor);
+ ServerRpcController controller = new ServerRpcController();
+ RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest request
+ =
RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest.newBuilder().build();
+ coprocessor.invalidateHAGroupStoreClient(controller, request, null);
+ assertFalse(controller.failed());
+ }
+
private String getCreateTableStmt(String tableName) {
return "CREATE TABLE " + tableName +
" (a_string varchar not null, col1 integer" +
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
new file mode 100644
index 0000000000..cd0679e9c2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+/**
+ * Integration tests for {@link HAGroupStoreClient}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStoreClientIT extends BaseTest {
+
+ private final PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config);
+ private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void before() throws Exception {
+ // Clean up all the existing CRRs
+ List<ClusterRoleRecord> crrs =
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+ for (ClusterRoleRecord crr : crrs) {
+
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+ }
+ }
+
+ @Test
+ public void testHAGroupStoreClientWithSingleCRR() throws Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(config);
+ ClusterRoleRecord crr = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+ // Now Update CRR so that current cluster has state ACTIVE_TO_STANDBY
+ crr = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ // Check that now the cluster should be in ActiveToStandby
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty();
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+
+
+ // Change it back to ACTIVE so that cluster is not in
ACTIVE_TO_STANDBY state
+ crr = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+
+ // Change it again to ACTIVE_TO_STANDBY so that we can validate
watcher works repeatedly
+ crr = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty();
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+
+
+ // Change peer cluster to ACTIVE_TO_STANDBY so that we can still
process mutation on this cluster
+ crr = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url",
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+ }
+
+
+ @Test
+ public void testHAGroupStoreClientWithMultipleCRRs() throws Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(config);
+ // Setup initial CRRs
+ ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 2;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+ // Now Update CRR so that current cluster has state ACTIVE_TO_STANDBY
for only 1 crr
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ // Check that now the cluster should be in ActiveToStandby
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+
+
+ // Change it back to ACTIVE so that cluster is not in
ACTIVE_TO_STANDBY state
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 2;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+
+ // Change other crr to ACTIVE_TO_STANDBY and one in ACTIVE state so
that we can validate watcher works repeatedly
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+
+
+ // Change peer cluster to ACTIVE_TO_STANDBY so that we can still
process mutation on this cluster
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url",
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url",
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 2;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+ }
+
+ @Test
+ public void testMultiThreadedAccessToHACache() throws Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(config);
+ // Setup initial CRRs
+ ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ int threadCount = 10;
+ final CountDownLatch latch = new CountDownLatch(threadCount);
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 2;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+ latch.countDown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ assert latch.await(10, TimeUnit.SECONDS);
+
+ // Update CRRs
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ final CountDownLatch latch2 = new CountDownLatch(threadCount);
+ executor = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+ latch2.countDown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ assert latch2.await(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHAGroupStoreClientWithRootPathDeletion() throws Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(config);
+ ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+
+
haAdmin.getCurator().delete().deletingChildrenIfNeeded().forPath(ZKPaths.PATH_SEPARATOR);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty();
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+ }
+
+ @Test
+ public void testThrowsExceptionWithZKDisconnectionAndThenConnection()
throws Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(config);
+ // Setup initial CRRs
+ ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 2;
+
+ // Shutdown the ZK Cluster to simulate CONNECTION_SUSPENDED event
+ utility.shutdownMiniZKCluster();
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ //Check that HAGroupStoreClient instance is not healthy and throws
IOException
+ assertThrows(IOException.class, () ->
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE));
+
+ // Start ZK on the same port to simulate CONNECTION_RECONNECTED event
+
utility.startMiniZKCluster(1,Integer.parseInt(getZKClientPort(config)));
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ //Check that HAGroupStoreClient instance is back to healthy and
provides correct response
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 2;
+ }
+
+
+ @Test
+ public void testHAGroupStoreClientWithDifferentZKURLFormats() throws
Exception {
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstance(config);
+ final String zkClientPort = getZKClientPort(config);
+ // Setup initial CRRs
+ final String format1 = "127.0.0.1\\:"+zkClientPort+"::/hbase"; //
127.0.0.1\:53228::/hbase
+ final String format2 = "127.0.0.1:"+zkClientPort+"::/hbase"; //
127.0.0.1:53228::/hbase
+ final String format3 = "127.0.0.1\\:"+zkClientPort+":/hbase"; //
127.0.0.1\:53228:/hbase
+
+ ClusterRoleRecord crr1 = new ClusterRoleRecord("parallel1",
+ HighAvailabilityPolicy.PARALLEL, format1,
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+
+ ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel2",
+ HighAvailabilityPolicy.PARALLEL, format2,
ClusterRoleRecord.ClusterRole.STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ ClusterRoleRecord crr3 = new ClusterRoleRecord("parallel3",
+ HighAvailabilityPolicy.PARALLEL, format3,
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr3);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
== 1;
+ assert
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.STANDBY).size()
== 1;
+ }
+
+
+ /**
+ * This test verifies that the updates coming via
PathChildrenCacheListener are in order in which updates are sent to ZK
+ * @throws Exception
+ */
+ @Test
+ public void testHAGroupStoreClientWithMultiThreadedUpdates() throws
Exception {
+ // Number of threads to execute
+ int threadCount = 15;
+
+ // Capture versions of crr in a list(crrEventVersions) in order they
are received.
+ List<Integer> crrEventVersions = new ArrayList<>();
+ CountDownLatch eventsLatch = new CountDownLatch(threadCount);
+ PathChildrenCacheListener pathChildrenCacheListener = (client, event)
-> {
+ if(event.getData() != null && event.getData().getData() != null &&
ClusterRoleRecord.fromJson(event.getData().getData()).isPresent()) {
+ ClusterRoleRecord crr =
ClusterRoleRecord.fromJson(event.getData().getData()).get();
+ crrEventVersions.add((int)crr.getVersion());
+ eventsLatch.countDown();
+ }
+ };
+
+ // Start a new HAGroupStoreClient.
+ new HAGroupStoreClient(config, pathChildrenCacheListener);
+
+ // Create multiple threads for update to ZK.
+ final CountDownLatch updateLatch = new CountDownLatch(threadCount);
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ // List captures the order of events that are sent.
+ List<Integer> updateList = new ArrayList<>();
+
+ // Create a queue which can be polled to send updates to ZK.
+ ConcurrentLinkedQueue<ClusterRoleRecord> updateQueue = new
ConcurrentLinkedQueue<>();
+ for(int i = 0; i < threadCount; i++) {
+ updateQueue.add(createCRR(i+1));
+ updateList.add(i+1);
+ }
+
+ // Submit updates to ZK.
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ synchronized (HAGroupStoreClientIT.class) {
+
haAdmin.createOrUpdateDataOnZookeeper(Objects.requireNonNull(updateQueue.poll()));
+ }
+ updateLatch.countDown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ // Check if updates are sent and updates are received.
+ assert
eventsLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS*threadCount,
TimeUnit.MILLISECONDS);
+ assert
updateLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS*threadCount,
TimeUnit.MILLISECONDS);
+
+ // Assert that the order of updates is same as order of events.
+ assert updateList.equals(crrEventVersions);
+ }
+
+ private ClusterRoleRecord createCRR(Integer version) {
+ return new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY,
version);
+ }
+
+
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
new file mode 100644
index 0000000000..9dfbb6cb4f
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertFalse;
+
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStoreManagerIT extends BaseTest {
+ private final PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config);
+ private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void before() throws Exception {
+ // Clean up all the existing CRRs
+ List<ClusterRoleRecord> crrs =
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+ for (ClusterRoleRecord crr : crrs) {
+
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+ }
+ }
+
+ @Test
+ public void testHAGroupStoreManagerWithSingleCRR() throws Exception {
+ HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(config);
+ // Setup initial CRRs
+ ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ assertFalse(haGroupStoreManager.isMutationBlocked());
+
+ crr1 = new ClusterRoleRecord("failover",
+ HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ crr2 = new ClusterRoleRecord("parallel",
+ HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr1);
+ haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ assert haGroupStoreManager.isMutationBlocked();
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index 27b627753f..4a7c6bce91 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.jdbc;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
-import org.apache.phoenix.query.QueryServicesOptions;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.RandomUtils;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
-import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +61,8 @@ import static
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCo
import static
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT;
import static
org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR;
import static org.apache.phoenix.jdbc.HighAvailabilityGroup.*;
+import static
org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
+
import static
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -86,8 +86,8 @@ public class HighAvailabilityTestingUtility {
private String url1;
/** The host\:port::/hbase format of the JDBC string for HBase cluster
2. */
private String url2;
- private PhoenixHAAdminHelper haAdmin1;
- private PhoenixHAAdminHelper haAdmin2;
+ private PhoenixHAAdmin haAdmin1;
+ private PhoenixHAAdmin haAdmin2;
private Admin admin1;
private Admin admin2;
@VisibleForTesting
@@ -121,8 +121,8 @@ public class HighAvailabilityTestingUtility {
url1 = String.format("%s\\:%d::/hbase", confAddress1,
hbaseCluster1.getZkCluster().getClientPort());
url2 = String.format("%s\\:%d::/hbase", confAddress2,
hbaseCluster2.getZkCluster().getClientPort());
- haAdmin1 = new PhoenixHAAdminHelper(getUrl1(),
hbaseCluster1.getConfiguration(),
PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
- haAdmin2 = new PhoenixHAAdminHelper(getUrl2(),
hbaseCluster2.getConfiguration(),
PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
+ haAdmin1 = new PhoenixHAAdmin(getUrl1(),
hbaseCluster1.getConfiguration(), HighAvailibilityCuratorProvider.INSTANCE);
+ haAdmin2 = new PhoenixHAAdmin(getUrl2(),
hbaseCluster2.getConfiguration(), HighAvailibilityCuratorProvider.INSTANCE);
admin1 = hbaseCluster1.getConnection().getAdmin();
admin2 = hbaseCluster2.getConnection().getAdmin();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
index eeafec80ba..e00d15c515 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.phoenix.jdbc;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.RET_SUCCESS;
-import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl;
+import static
org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -46,7 +48,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
-import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.After;
import org.junit.Before;
@@ -59,7 +60,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link
PhoenixHAAdminHelper}.
+ * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link
PhoenixHAAdmin}.
*
* @see PhoenixHAAdminToolIT
*/
@@ -70,12 +71,13 @@ public class PhoenixHAAdminToolTest {
private static final PrintStream STDOUT = System.out;
private static final ByteArrayOutputStream STDOUT_CAPTURE = new
ByteArrayOutputStream();
- private final PhoenixHAAdminTool.HighAvailibilityCuratorProvider
mockHighAvailibilityCuratorProvider =
Mockito.mock(PhoenixHAAdminTool.HighAvailibilityCuratorProvider.class);
+ private final HighAvailibilityCuratorProvider
mockHighAvailibilityCuratorProvider =
Mockito.mock(HighAvailibilityCuratorProvider.class);
/** Use mocked curator since there is no mini-ZK cluster. */
private final CuratorFramework curator =
Mockito.mock(CuratorFramework.class);
/** HA admin to test for one test case. */
- private final PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(ZK1,
new Configuration(), mockHighAvailibilityCuratorProvider);
+ private final PhoenixHAAdmin
+ admin = new PhoenixHAAdmin(ZK1, new Configuration(),
mockHighAvailibilityCuratorProvider);
private String haGroupName;
private ClusterRoleRecord recordV1;