This is an automated email from the ASF dual-hosted git repository.
jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d064c756b8 PHOENIX-7598 Add ITs for client write behavior when they
encounter ATS state (#2198)
d064c756b8 is described below
commit d064c756b8b13c5fc04b569bd8b281401a689389
Author: ritegarg <[email protected]>
AuthorDate: Mon Jun 30 10:18:47 2025 -0700
PHOENIX-7598 Add ITs for client write behavior when they encounter ATS
state (#2198)
Co-authored-by: Ritesh Garg
<[email protected]>
---
.../exception/MutationBlockedIOException.java | 36 +++
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 23 +-
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 19 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 6 +-
.../IndexRegionObserverMutationBlockingIT.java | 258 +++++++++++++++++++++
.../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 74 +++++-
6 files changed, 399 insertions(+), 17 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/MutationBlockedIOException.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/MutationBlockedIOException.java
new file mode 100644
index 0000000000..590c3615d6
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/MutationBlockedIOException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is set
+ * and the current cluster role is ACTIVE_TO_STANDBY.
+ */
+public class MutationBlockedIOException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * @param msg reason for the exception
+ */
+ public MutationBlockedIOException(String msg) {
+ super(msg);
+ }
+
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index c3606ddad2..32498aa4f8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -31,12 +31,15 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+
/**
* Write-through cache for HAGroupStore.
@@ -45,7 +48,6 @@ import java.util.stream.Collectors;
public class HAGroupStoreClient implements Closeable {
private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS
= 30000L;
- private static volatile HAGroupStoreClient haGroupStoreClientInstance;
private PhoenixHAAdmin phoenixHaAdmin;
private static final Logger LOGGER =
LoggerFactory.getLogger(HAGroupStoreClient.class);
// Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
@@ -53,6 +55,7 @@ public class HAGroupStoreClient implements Closeable {
= new ConcurrentHashMap<>();
private PathChildrenCache pathChildrenCache;
private volatile boolean isHealthy;
+ private static final Map<String, HAGroupStoreClient> instances = new
ConcurrentHashMap<>();
/**
* Creates/gets an instance of HAGroupStoreClient.
@@ -62,18 +65,22 @@ public class HAGroupStoreClient implements Closeable {
* @return HAGroupStoreClient instance
*/
public static HAGroupStoreClient getInstance(Configuration conf) {
- if (haGroupStoreClientInstance == null ||
!haGroupStoreClientInstance.isHealthy) {
+ final String zkUrl = getLocalZkUrl(conf);
+ HAGroupStoreClient result = instances.get(zkUrl);
+ if (result == null || !result.isHealthy) {
synchronized (HAGroupStoreClient.class) {
- if (haGroupStoreClientInstance == null ||
!haGroupStoreClientInstance.isHealthy) {
- haGroupStoreClientInstance = new HAGroupStoreClient(conf,
null);
- if (!haGroupStoreClientInstance.isHealthy) {
- haGroupStoreClientInstance.close();
- haGroupStoreClientInstance = null;
+ result = instances.get(zkUrl);
+ if (result == null || !result.isHealthy) {
+ result = new HAGroupStoreClient(conf, null);
+ if (!result.isHealthy) {
+ result.close();
+ result = null;
}
+ instances.put(zkUrl, result);
}
}
}
- return haGroupStoreClientInstance;
+ return result;
}
@VisibleForTesting
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 690953a7b0..196c565730 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -19,29 +19,38 @@ package org.apache.phoenix.jdbc;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
public class HAGroupStoreManager {
- private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
private final boolean mutationBlockEnabled;
private final Configuration conf;
+ private static volatile Map<String, HAGroupStoreManager> INSTANCES = new
ConcurrentHashMap<>();
/**
* Creates/gets an instance of HAGroupStoreManager.
+ * Provides unique instance for each ZK URL
*
* @param conf configuration
* @return HAGroupStoreManager instance
*/
public static HAGroupStoreManager getInstance(Configuration conf) {
- if (haGroupStoreManagerInstance == null) {
+ final String zkUrl = getLocalZkUrl(conf);
+ HAGroupStoreManager result = INSTANCES.get(zkUrl);
+ if (result == null) {
synchronized (HAGroupStoreManager.class) {
- if (haGroupStoreManagerInstance == null) {
- haGroupStoreManagerInstance = new
HAGroupStoreManager(conf);
+ result = INSTANCES.get(zkUrl);
+ if (result == null) {
+ result = new HAGroupStoreManager(conf);
+ INSTANCES.put(zkUrl, result);
}
}
}
- return haGroupStoreManagerInstance;
+ return result;
}
private HAGroupStoreManager(final Configuration conf) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index adb2b2bafb..e1a0f1d54a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -58,6 +58,7 @@ import
org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.exception.MutationBlockedIOException;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.CaseExpression;
import org.apache.phoenix.expression.Expression;
@@ -544,8 +545,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
final Configuration conf = c.getEnvironment().getConfiguration();
final HAGroupStoreManager haGroupStoreManager =
HAGroupStoreManager.getInstance(conf);
if (haGroupStoreManager.isMutationBlocked()) {
- throw new IOException("Blocking Mutation as Some CRRs are in
ACTIVE_TO_STANDBY "
- + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED
is true");
+ throw new MutationBlockedIOException("Blocking Mutation as some
CRRs "
+ + "are in ACTIVE_TO_STANDBY state and "
+ + "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is true");
}
preBatchMutateWithExceptions(c, miniBatchOp);
return;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
new file mode 100644
index 0000000000..31acab527f
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.MutationBlockedIOException;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
+import org.apache.phoenix.jdbc.PhoenixHAAdmin;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Integration test for mutation blocking functionality in
IndexRegionObserver.preBatchMutate()
+ * Tests that MutationBlockedIOException is properly thrown when cluster
role-based mutation
+ * blocking is enabled and CRRs are in ACTIVE_TO_STANDBY state.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexRegionObserverMutationBlockingIT extends BaseTest {
+
+ private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
+ private PhoenixHAAdmin haAdmin;
+ private HAGroupStoreManager haGroupStoreManager;
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Enable cluster role-based mutation blocking
+ props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+ // No retries so that tests fail faster.
+ props.put("hbase.client.retries.number", "0");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ haAdmin = new PhoenixHAAdmin(config);
+ haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+ // Clean up all existing CRRs before each test
+ List<ClusterRoleRecord> crrs =
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+ for (ClusterRoleRecord crr : crrs) {
+
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+ }
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Clean up CRRs after each test
+ if (haAdmin != null) {
+ List<ClusterRoleRecord> crrs =
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+ for (ClusterRoleRecord crr : crrs) {
+
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+ }
+ }
+ }
+
+ @Test
+ public void testMutationBlockedOnDataTableWithIndex() throws Exception {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Create data table and index
+ conn.createStatement().execute("CREATE TABLE " + dataTableName +
+ " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)");
+ conn.createStatement().execute("CREATE INDEX " + indexName +
+ " ON " + dataTableName + "(name)");
+
+ // Initially, mutations should work - verify baseline
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('1', 'John', 25)");
+ conn.commit();
+
+ // Set up CRR that will block mutations (ACTIVE_TO_STANDBY state)
+ ClusterRoleRecord blockingCrr = new
ClusterRoleRecord("failover_test",
+ HighAvailabilityPolicy.FAILOVER,
+ haAdmin.getZkUrl(),
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "standby-zk-url",
+ ClusterRoleRecord.ClusterRole.STANDBY,
+ 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(blockingCrr);
+
+ // Wait for the event to propagate
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify that mutations are now blocked
+ assertTrue("Mutations should be blocked",
haGroupStoreManager.isMutationBlocked());
+
+ // Test that UPSERT throws MutationBlockedIOException
+ try {
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('2', 'Jane', 30)");
+ conn.commit();
+ fail("Expected MutationBlockedIOException to be thrown");
+ } catch (CommitException e) {
+ // Verify the exception chain contains
MutationBlockedIOException
+ assertTrue("Expected MutationBlockedIOException in exception
chain",
+ containsMutationBlockedException(e));
+ }
+ }
+ }
+
+
+ @Test
+ public void testMutationAllowedWhenNotBlocked() throws Exception {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Create data table and index
+ conn.createStatement().execute("CREATE TABLE " + dataTableName +
+ " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)");
+ conn.createStatement().execute("CREATE INDEX " + indexName +
+ " ON " + dataTableName + "(name)");
+
+ // Set up CRR in ACTIVE state (should not block)
+ ClusterRoleRecord nonBlockingCrr = new
ClusterRoleRecord("active_test",
+ HighAvailabilityPolicy.FAILOVER,
+ haAdmin.getZkUrl(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
+ "standby-zk-url",
+ ClusterRoleRecord.ClusterRole.STANDBY,
+ 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(nonBlockingCrr);
+
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Mutations should work fine
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('1', 'Bob', 35)");
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('2', 'Carol', 27)");
+ conn.commit();
+
+ // Verify data was inserted successfully
+ try (java.sql.ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT COUNT(*) FROM " + dataTableName)) {
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ }
+ }
+ }
+
+ @Test
+ public void testMutationBlockingTransition() throws Exception {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Create data table and index
+ conn.createStatement().execute("CREATE TABLE " + dataTableName +
+ " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)");
+ conn.createStatement().execute("CREATE INDEX " + indexName +
+ " ON " + dataTableName + "(name)");
+
+ // Initially set up non-blocking CRR
+ ClusterRoleRecord crr = new ClusterRoleRecord("transition_test",
+ HighAvailabilityPolicy.FAILOVER,
+ haAdmin.getZkUrl(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
+ "standby-zk-url",
+ ClusterRoleRecord.ClusterRole.STANDBY,
+ 1L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Mutation should work
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('1', 'David', 40)");
+ conn.commit();
+
+ // Transition to ACTIVE_TO_STANDBY (blocking state)
+ crr = new ClusterRoleRecord("transition_test",
+ HighAvailabilityPolicy.FAILOVER,
+ haAdmin.getZkUrl(),
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ "standby-zk-url",
+ ClusterRoleRecord.ClusterRole.STANDBY,
+ 2L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Now mutations should be blocked
+ try {
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('2', 'Eve', 33)");
+ conn.commit();
+ fail("Expected MutationBlockedIOException after transition to
ACTIVE_TO_STANDBY");
+ } catch (CommitException e) {
+ assertTrue("Expected mutation blocked exception after
transition",
+ containsMutationBlockedException(e));
+ }
+
+ // Transition back to ACTIVE (non-blocking state) and peer cluster
is in ATS state
+ crr = new ClusterRoleRecord("transition_test",
+ HighAvailabilityPolicy.FAILOVER,
+ haAdmin.getZkUrl(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
+ "standby-zk-url",
+ ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+ 3L);
+ haAdmin.createOrUpdateDataOnZookeeper(crr);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Mutations should work again
+ conn.createStatement().execute("UPSERT INTO " + dataTableName +
+ " VALUES ('3', 'Frank', 45)");
+ conn.commit();
+ }
+ }
+
+ private boolean containsMutationBlockedException(CommitException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RetriesExhaustedWithDetailsException) {
+ RetriesExhaustedWithDetailsException re =
(RetriesExhaustedWithDetailsException) cause;
+ return re.getCause(0) instanceof MutationBlockedIOException;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 32d011cd9c..e05eba63d3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -36,6 +36,7 @@ import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALL
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
import static
org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI;
import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
@@ -51,14 +52,17 @@ import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.doAnswer;
-import java.lang.reflect.Field;
import java.sql.Connection;
+import java.lang.reflect.Field;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
@@ -66,9 +70,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.MutationBlockedIOException;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
import
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
@@ -118,6 +124,8 @@ public class ParallelPhoenixConnectionIT {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+
CLUSTERS.getHBaseCluster1().getConfiguration().setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
true);
+
CLUSTERS.getHBaseCluster2().getConfiguration().setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
true);
CLUSTERS.start();
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
DriverManager.registerDriver(new PhoenixTestDriver());
@@ -131,6 +139,7 @@ public class ParallelPhoenixConnectionIT {
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE,
String.valueOf(23));
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
String.valueOf(true));
+ GLOBAL_PROPERTIES.setProperty("hbase.client.retries.number", "0");
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED,
String.valueOf(true));
}
@@ -356,6 +365,67 @@ public class ParallelPhoenixConnectionIT {
}
}
+ /**
+ * Test Phoenix connection creation and basic operations with HBase both
cluster is ACTIVE_TO_STANDBY role.
+ */
+ @Test
+ public void testBothClusterATSRole() throws Exception {
+ CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY,
ClusterRole.ACTIVE_TO_STANDBY);
+ try (Connection conn = getParallelConnection()) {
+ doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+ fail("Expected MutationBlockedIOException to be thrown");
+ } catch (SQLException e) {
+ assertTrue(containsMutationBlockedException(e));
+ } finally {
+ CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE,
ClusterRole.STANDBY);
+ }
+ }
+
+ /**
+ * Test Phoenix connection creation and
+ * basic operations with HBase one cluster is ACTIVE_TO_STANDBY role
+ * and other in ACTIVE role.
+ */
+ @Test
+ public void testOneClusterATSRoleWithActive() throws Exception {
+ testOneClusterATSRole(ClusterRole.ACTIVE);
+ }
+
+ /**
+ * Test Phoenix connection creation and
+ * basic operations with HBase one cluster is ACTIVE_TO_STANDBY role
+ * and other in STANDBY role.
+ */
+ @Test
+ public void testOneClusterATSRoleWithStandby() throws Exception {
+ testOneClusterATSRole(ClusterRole.STANDBY);
+ }
+
+ private void testOneClusterATSRole(ClusterRole otherRole) throws Exception
{
+ CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY,
otherRole);
+ try (Connection conn = getParallelConnection()) {
+ doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+ } catch (SQLException e) {
+ fail("Expected no exception to be thrown as one cluster is "
+ + "in ACTIVE_TO_STANDBY and other in " + otherRole);
+ } finally {
+ CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE,
ClusterRole.STANDBY);
+ }
+ }
+
+ private boolean containsMutationBlockedException(SQLException e) {
+ Throwable cause = e.getCause();
+ // Recursively check for MutationBlockedIOException buried in
exception stack.
+ while (cause != null) {
+ if (cause instanceof RetriesExhaustedWithDetailsException) {
+ RetriesExhaustedWithDetailsException re =
(RetriesExhaustedWithDetailsException) cause;
+ return re.getCause(0) instanceof MutationBlockedIOException;
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
+
/**
* Test Phoenix connection creation and basic operations.
*/