This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d064c756b8 PHOENIX-7598 Add ITs for client write behavior when they 
encounter ATS state (#2198)
d064c756b8 is described below

commit d064c756b8b13c5fc04b569bd8b281401a689389
Author: ritegarg <[email protected]>
AuthorDate: Mon Jun 30 10:18:47 2025 -0700

    PHOENIX-7598 Add ITs for client write behavior when they encounter ATS 
state (#2198)
    
    Co-authored-by: Ritesh Garg 
<[email protected]>
---
 .../exception/MutationBlockedIOException.java      |  36 +++
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    |  23 +-
 .../apache/phoenix/jdbc/HAGroupStoreManager.java   |  19 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |   6 +-
 .../IndexRegionObserverMutationBlockingIT.java     | 258 +++++++++++++++++++++
 .../phoenix/jdbc/ParallelPhoenixConnectionIT.java  |  74 +++++-
 6 files changed, 399 insertions(+), 17 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/MutationBlockedIOException.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/MutationBlockedIOException.java
new file mode 100644
index 0000000000..590c3615d6
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/MutationBlockedIOException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is set
+ * and the current cluster role is ACTIVE_TO_STANDBY.
+ */
+public class MutationBlockedIOException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * @param msg reason for the exception
+     */
+    public MutationBlockedIOException(String msg) {
+        super(msg);
+    }
+
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index c3606ddad2..32498aa4f8 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -31,12 +31,15 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+
 
 /**
  * Write-through cache for HAGroupStore.
@@ -45,7 +48,6 @@ import java.util.stream.Collectors;
 public class HAGroupStoreClient implements Closeable {
 
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
-    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
     private PhoenixHAAdmin phoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
     // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
@@ -53,6 +55,7 @@ public class HAGroupStoreClient implements Closeable {
             = new ConcurrentHashMap<>();
     private PathChildrenCache pathChildrenCache;
     private volatile boolean isHealthy;
+    private static final Map<String, HAGroupStoreClient> instances = new 
ConcurrentHashMap<>();
 
     /**
      * Creates/gets an instance of HAGroupStoreClient.
@@ -62,18 +65,22 @@ public class HAGroupStoreClient implements Closeable {
      * @return HAGroupStoreClient instance
      */
     public static HAGroupStoreClient getInstance(Configuration conf) {
-        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+        final String zkUrl = getLocalZkUrl(conf);
+        HAGroupStoreClient result = instances.get(zkUrl);
+        if (result == null || !result.isHealthy) {
             synchronized (HAGroupStoreClient.class) {
-                if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
-                    haGroupStoreClientInstance = new HAGroupStoreClient(conf, 
null);
-                    if (!haGroupStoreClientInstance.isHealthy) {
-                        haGroupStoreClientInstance.close();
-                        haGroupStoreClientInstance = null;
+                result = instances.get(zkUrl);
+                if (result == null || !result.isHealthy) {
+                    result = new HAGroupStoreClient(conf, null);
+                    if (!result.isHealthy) {
+                        result.close();
+                        result = null;
                     }
+                    instances.put(zkUrl, result);
                 }
             }
         }
-        return haGroupStoreClientInstance;
+        return result;
     }
 
     @VisibleForTesting
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 690953a7b0..196c565730 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -19,29 +19,38 @@ package org.apache.phoenix.jdbc;
 
 import org.apache.hadoop.conf.Configuration;
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 
 public class HAGroupStoreManager {
-    private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
     private final boolean mutationBlockEnabled;
     private final Configuration conf;
+    private static volatile Map<String, HAGroupStoreManager> INSTANCES = new 
ConcurrentHashMap<>();
 
     /**
      * Creates/gets an instance of HAGroupStoreManager.
+     * Provides unique instance for each ZK URL
      *
      * @param conf configuration
      * @return HAGroupStoreManager instance
      */
     public static HAGroupStoreManager getInstance(Configuration conf) {
-        if (haGroupStoreManagerInstance == null) {
+        final String zkUrl = getLocalZkUrl(conf);
+        HAGroupStoreManager result = INSTANCES.get(zkUrl);
+        if (result == null) {
             synchronized (HAGroupStoreManager.class) {
-                if (haGroupStoreManagerInstance == null) {
-                    haGroupStoreManagerInstance = new 
HAGroupStoreManager(conf);
+                result = INSTANCES.get(zkUrl);
+                if (result == null) {
+                    result = new HAGroupStoreManager(conf);
+                    INSTANCES.put(zkUrl, result);
                 }
             }
         }
-        return haGroupStoreManagerInstance;
+        return result;
     }
 
     private HAGroupStoreManager(final Configuration conf) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index adb2b2bafb..e1a0f1d54a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -58,6 +58,7 @@ import 
org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.exception.MutationBlockedIOException;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.Expression;
@@ -544,8 +545,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           final Configuration conf = c.getEnvironment().getConfiguration();
           final HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(conf);
           if (haGroupStoreManager.isMutationBlocked()) {
-              throw new IOException("Blocking Mutation as Some CRRs are in 
ACTIVE_TO_STANDBY "
-                      + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED 
is true");
+              throw new MutationBlockedIOException("Blocking Mutation as some 
CRRs "
+                      + "are in ACTIVE_TO_STANDBY state and "
+                      + "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is true");
           }
           preBatchMutateWithExceptions(c, miniBatchOp);
           return;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
new file mode 100644
index 0000000000..31acab527f
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.MutationBlockedIOException;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
+import org.apache.phoenix.jdbc.PhoenixHAAdmin;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Integration test for mutation blocking functionality in 
IndexRegionObserver.preBatchMutate()
+ * Tests that MutationBlockedIOException is properly thrown when cluster 
role-based mutation
+ * blocking is enabled and CRRs are in ACTIVE_TO_STANDBY state.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexRegionObserverMutationBlockingIT extends BaseTest {
+
+    private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
+    private PhoenixHAAdmin haAdmin;
+    private HAGroupStoreManager haGroupStoreManager;
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Enable cluster role-based mutation blocking
+        props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+        // No retries so that tests fail faster.
+        props.put("hbase.client.retries.number", "0");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        haAdmin = new PhoenixHAAdmin(config);
+        haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+        // Clean up all existing CRRs before each test
+        List<ClusterRoleRecord> crrs = 
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+        for (ClusterRoleRecord crr : crrs) {
+            
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+        }
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // Clean up CRRs after each test
+        if (haAdmin != null) {
+            List<ClusterRoleRecord> crrs = 
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+            for (ClusterRoleRecord crr : crrs) {
+                
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+            }
+        }
+    }
+
+    @Test
+    public void testMutationBlockedOnDataTableWithIndex() throws Exception {
+        String dataTableName = generateUniqueName();
+        String indexName = generateUniqueName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create data table and index
+            conn.createStatement().execute("CREATE TABLE " + dataTableName +
+                " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)");
+            conn.createStatement().execute("CREATE INDEX " + indexName +
+                " ON " + dataTableName + "(name)");
+
+            // Initially, mutations should work - verify baseline
+            conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                " VALUES ('1', 'John', 25)");
+            conn.commit();
+
+            // Set up CRR that will block mutations (ACTIVE_TO_STANDBY state)
+            ClusterRoleRecord blockingCrr = new 
ClusterRoleRecord("failover_test",
+                    HighAvailabilityPolicy.FAILOVER,
+                    haAdmin.getZkUrl(),
+                    ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                    "standby-zk-url",
+                    ClusterRoleRecord.ClusterRole.STANDBY,
+                    1L);
+            haAdmin.createOrUpdateDataOnZookeeper(blockingCrr);
+
+            // Wait for the event to propagate
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Verify that mutations are now blocked
+            assertTrue("Mutations should be blocked", 
haGroupStoreManager.isMutationBlocked());
+
+            // Test that UPSERT throws MutationBlockedIOException
+            try {
+                conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                    " VALUES ('2', 'Jane', 30)");
+                conn.commit();
+                fail("Expected MutationBlockedIOException to be thrown");
+            } catch (CommitException e) {
+                // Verify the exception chain contains 
MutationBlockedIOException
+                assertTrue("Expected MutationBlockedIOException in exception 
chain",
+                    containsMutationBlockedException(e));
+            }
+        }
+    }
+
+
+    @Test
+    public void testMutationAllowedWhenNotBlocked() throws Exception {
+        String dataTableName = generateUniqueName();
+        String indexName = generateUniqueName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create data table and index
+            conn.createStatement().execute("CREATE TABLE " + dataTableName +
+                " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)");
+            conn.createStatement().execute("CREATE INDEX " + indexName +
+                " ON " + dataTableName + "(name)");
+
+            // Set up CRR in ACTIVE state (should not block)
+            ClusterRoleRecord nonBlockingCrr = new 
ClusterRoleRecord("active_test",
+                    HighAvailabilityPolicy.FAILOVER,
+                    haAdmin.getZkUrl(),
+                    ClusterRoleRecord.ClusterRole.ACTIVE,
+                    "standby-zk-url",
+                    ClusterRoleRecord.ClusterRole.STANDBY,
+                    1L);
+            haAdmin.createOrUpdateDataOnZookeeper(nonBlockingCrr);
+
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Mutations should work fine
+            conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                " VALUES ('1', 'Bob', 35)");
+            conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                " VALUES ('2', 'Carol', 27)");
+            conn.commit();
+
+            // Verify data was inserted successfully
+            try (java.sql.ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT COUNT(*) FROM " + dataTableName)) {
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+            }
+        }
+    }
+
+    @Test
+    public void testMutationBlockingTransition() throws Exception {
+        String dataTableName = generateUniqueName();
+        String indexName = generateUniqueName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Create data table and index
+            conn.createStatement().execute("CREATE TABLE " + dataTableName +
+                " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)");
+            conn.createStatement().execute("CREATE INDEX " + indexName +
+                " ON " + dataTableName + "(name)");
+
+            // Initially set up non-blocking CRR
+            ClusterRoleRecord crr = new ClusterRoleRecord("transition_test",
+                    HighAvailabilityPolicy.FAILOVER,
+                    haAdmin.getZkUrl(),
+                    ClusterRoleRecord.ClusterRole.ACTIVE,
+                    "standby-zk-url",
+                    ClusterRoleRecord.ClusterRole.STANDBY,
+                    1L);
+            haAdmin.createOrUpdateDataOnZookeeper(crr);
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Mutation should work
+            conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                " VALUES ('1', 'David', 40)");
+            conn.commit();
+
+            // Transition to ACTIVE_TO_STANDBY (blocking state)
+            crr = new ClusterRoleRecord("transition_test",
+                    HighAvailabilityPolicy.FAILOVER,
+                    haAdmin.getZkUrl(),
+                    ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                    "standby-zk-url",
+                    ClusterRoleRecord.ClusterRole.STANDBY,
+                    2L);
+            haAdmin.createOrUpdateDataOnZookeeper(crr);
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Now mutations should be blocked
+            try {
+                conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                    " VALUES ('2', 'Eve', 33)");
+                conn.commit();
+                fail("Expected MutationBlockedIOException after transition to 
ACTIVE_TO_STANDBY");
+            } catch (CommitException e) {
+                assertTrue("Expected mutation blocked exception after 
transition",
+                    containsMutationBlockedException(e));
+            }
+
+            // Transition back to ACTIVE (non-blocking state) and peer cluster 
is in ATS state
+            crr = new ClusterRoleRecord("transition_test",
+                    HighAvailabilityPolicy.FAILOVER,
+                    haAdmin.getZkUrl(),
+                    ClusterRoleRecord.ClusterRole.ACTIVE,
+                    "standby-zk-url",
+                    ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                    3L);
+            haAdmin.createOrUpdateDataOnZookeeper(crr);
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Mutations should work again
+            conn.createStatement().execute("UPSERT INTO " + dataTableName +
+                " VALUES ('3', 'Frank', 45)");
+            conn.commit();
+        }
+    }
+
+    private boolean containsMutationBlockedException(CommitException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof RetriesExhaustedWithDetailsException) {
+            RetriesExhaustedWithDetailsException re = 
(RetriesExhaustedWithDetailsException) cause;
+            return re.getCause(0) instanceof MutationBlockedIOException;
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 32d011cd9c..e05eba63d3 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -36,6 +36,7 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALL
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
 import static 
org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI;
 import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
 import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
 import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
@@ -51,14 +52,17 @@ import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.doAnswer;
 
-import java.lang.reflect.Field;
 import java.sql.Connection;
+import java.lang.reflect.Field;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -66,9 +70,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.MutationBlockedIOException;
 import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
 import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
@@ -118,6 +124,8 @@ public class ParallelPhoenixConnectionIT {
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
+        
CLUSTERS.getHBaseCluster1().getConfiguration().setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
 true);
+        
CLUSTERS.getHBaseCluster2().getConfiguration().setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
 true);
         CLUSTERS.start();
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
         DriverManager.registerDriver(new PhoenixTestDriver());
@@ -131,6 +139,7 @@ public class ParallelPhoenixConnectionIT {
         GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, 
String.valueOf(23));
         
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
             String.valueOf(true));
+        GLOBAL_PROPERTIES.setProperty("hbase.client.retries.number", "0");
         GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED, 
String.valueOf(true));
     }
 
@@ -356,6 +365,67 @@ public class ParallelPhoenixConnectionIT {
         }
     }
 
+    /**
+     * Test Phoenix connection creation and basic operations with HBase both 
cluster is ACTIVE_TO_STANDBY role.
+     */
+    @Test
+    public void testBothClusterATSRole() throws Exception {
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, 
ClusterRole.ACTIVE_TO_STANDBY);
+        try (Connection conn = getParallelConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+            fail("Expected MutationBlockedIOException to be thrown");
+        } catch (SQLException e) {
+            assertTrue(containsMutationBlockedException(e));
+        } finally {
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE, 
ClusterRole.STANDBY);
+        }
+    }
+
+    /**
+     * Test Phoenix connection creation and
+     * basic operations with HBase one cluster is ACTIVE_TO_STANDBY role
+     * and other in ACTIVE role.
+     */
+    @Test
+    public void testOneClusterATSRoleWithActive() throws Exception {
+        testOneClusterATSRole(ClusterRole.ACTIVE);
+    }
+
+    /**
+     * Test Phoenix connection creation and
+     * basic operations with HBase one cluster is ACTIVE_TO_STANDBY role
+     * and other in STANDBY role.
+     */
+    @Test
+    public void testOneClusterATSRoleWithStandby() throws Exception {
+        testOneClusterATSRole(ClusterRole.STANDBY);
+    }
+
+    private void testOneClusterATSRole(ClusterRole otherRole) throws Exception 
{
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, 
otherRole);
+        try (Connection conn = getParallelConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        } catch (SQLException e) {
+            fail("Expected no exception to be thrown as one cluster is "
+                    + "in ACTIVE_TO_STANDBY and other in " + otherRole);
+        } finally {
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE, 
ClusterRole.STANDBY);
+        }
+    }
+
+    private boolean containsMutationBlockedException(SQLException e) {
+        Throwable cause = e.getCause();
+        // Recursively check for MutationBlockedIOException buried in 
exception stack.
+        while (cause != null) {
+            if (cause instanceof RetriesExhaustedWithDetailsException) {
+                RetriesExhaustedWithDetailsException re = 
(RetriesExhaustedWithDetailsException) cause;
+                return re.getCause(0) instanceof MutationBlockedIOException;
+            }
+            cause = cause.getCause();
+        }
+        return false;
+    }
+
     /**
      * Test Phoenix connection creation and basic operations.
      */

Reply via email to