Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 572d242dc -> 27d9ecd4e


PHOENIX-4094 ParallelWriterIndexCommitter incorrectly applys local updates to 
index tables for 4.x-HBase-0.98


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/27d9ecd4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/27d9ecd4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/27d9ecd4

Branch: refs/heads/4.x-HBase-1.2
Commit: 27d9ecd4ed1b101b9ab50546cb2840827f056fe4
Parents: 572d242
Author: chenglei <cheng...@apache.org>
Authored: Fri Aug 18 11:45:36 2017 +0800
Committer: chenglei <cheng...@apache.org>
Committed: Fri Aug 18 11:45:36 2017 +0800

----------------------------------------------------------------------
 .../wal/WALRecoveryRegionPostOpenIT.java        | 312 +++++++++++++++++++
 1 file changed, 312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/27d9ecd4/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
 
b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
new file mode 100644
index 0000000..92e4356
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java
@@ -0,0 +1,312 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import 
org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class WALRecoveryRegionPostOpenIT extends BaseTest {
+
+    private static final Log LOG = 
LogFactory.getLog(WALRecoveryRegionPostOpenIT.class);
+
+    private static final String DATA_TABLE_NAME="DATA_POST_OPEN";
+
+    private static final String INDEX_TABLE_NAME="INDEX_POST_OPEN";
+
+    private static final long ONE_SEC = 1000;
+    private static final long ONE_MIN = 60 * ONE_SEC;
+    private static final long TIMEOUT = ONE_MIN;
+
+    private static volatile CountDownLatch handleFailureCountDownLatch= null;
+
+    private static volatile Multimap<HTableInterfaceReference, Mutation> 
tableReferenceToMutation=null;
+
+    private static volatile int handleFailureCalledCount=0;
+
+    private static volatile boolean failIndexTableWrite=false;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put("hbase.coprocessor.region.classes", 
IndexTableFailingRegionObserver.class.getName());
+        serverProps.put(Indexer.RecoveryFailurePolicyKeyForTesting, 
ReleaseLatchOnFailurePolicy.class.getName());
+        serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+        serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000");
+        serverProps.put("data.tx.snapshot.dir", "/tmp");
+        serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.FALSE.toString());
+        Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.FALSE.toString());
+        NUM_SLAVES_BASE = 2;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    public static class IndexTableFailingRegionObserver extends 
SimpleRegionObserver {
+
+        @Override
+        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> observerContext, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+
+            if 
(observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString().contains(INDEX_TABLE_NAME)
 && failIndexTableWrite) {
+                throw new DoNotRetryIOException();
+            }
+            Mutation operation = miniBatchOp.getOperation(0);
+            Set<byte[]> keySet = operation.getFamilyMap().keySet();
+            for(byte[] family: keySet) {
+                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && failIndexTableWrite) {
+                    throw new DoNotRetryIOException();
+                }
+            }
+            super.preBatchMutate(observerContext, miniBatchOp);
+        }
+    }
+
+
+    public static class ReleaseLatchOnFailurePolicy extends 
StoreFailuresInCachePolicy {
+
+        public ReleaseLatchOnFailurePolicy(PerRegionIndexWriteCache 
failedIndexEdits) {
+            super(failedIndexEdits);
+        }
+
+        @Override
+        public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause) throws IOException
+        {
+            LOG.info("Found index update failure!");
+            handleFailureCalledCount++;
+            tableReferenceToMutation=attempted;
+            LOG.info("failed index update on WAL recovery - allowing index 
table can be write.");
+            failIndexTableWrite=false;
+            super.handleFailure(attempted, cause);
+
+            if(handleFailureCountDownLatch!=null) {
+                handleFailureCountDownLatch.countDown();
+            }
+         }
+    }
+
+    @Test
+    public void testRecoveryRegionPostOpen() throws Exception {
+        handleFailureCountDownLatch= null ;
+        tableReferenceToMutation=null;
+        handleFailureCalledCount=0;
+        failIndexTableWrite=false;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+
+        try (Connection conn = driver.connect(url, props)) {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("CREATE TABLE " + DATA_TABLE_NAME
+                    + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 
VARCHAR) ");
+
+
+            conn.createStatement().execute(
+                    "CREATE " +  "INDEX " + INDEX_TABLE_NAME + " ON " + 
DATA_TABLE_NAME + " (v1) INCLUDE (v2)");
+            String query = "SELECT * FROM " + DATA_TABLE_NAME;
+            ResultSet resultSet = conn.createStatement().executeQuery(query);
+            assertFalse(resultSet.next());
+
+            MiniHBaseCluster miniHBaseCluster = 
getUtility().getMiniHBaseCluster();
+            this.moveIndexTableRegionIfSameRegionSErver(miniHBaseCluster);
+            this.assertRegionServerDifferent(miniHBaseCluster);
+
+            //load one row into the table
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
DATA_TABLE_NAME + " VALUES(?,?,?)");
+            stmt.setString(1, "a");
+            stmt.setString(2, "x");
+            stmt.setString(3, "1");
+            stmt.execute();
+
+            this.assertRegionServerDifferent(miniHBaseCluster);
+
+            Scan scan = new Scan();
+            HTable primaryTable = new HTable(getUtility().getConfiguration(), 
DATA_TABLE_NAME);
+            ResultScanner resultScanner = primaryTable.getScanner(scan);
+            int count = 0;
+             for (Result result : resultScanner) {
+                 count++;
+             }
+             assertEquals("Got an unexpected found of data rows", 1, count);
+
+            // begin to kill data table regionServer,and data table region 
would move to the other regionSever,
+            // and then recover data table's WAL
+            handleFailureCountDownLatch=new CountDownLatch(1);
+            failIndexTableWrite=true;
+
+            ServerName 
dataTableRegionServerName=this.getRegionServerName(miniHBaseCluster, 
DATA_TABLE_NAME);
+
+            miniHBaseCluster.killRegionServer(dataTableRegionServerName);
+            
miniHBaseCluster.waitForRegionServerToStop(dataTableRegionServerName, TIMEOUT);
+
+            //there are only one regionServer now.
+            assertEquals("miniHBaseCluster.getLiveRegionServerThreads()", 
miniHBaseCluster.getLiveRegionServerThreads().size(),1);
+            HRegionServer 
liveRegionServer=miniHBaseCluster.getLiveRegionServerThreads().get(0).getRegionServer();
+
+            //verify handleFailure is called.
+            handleFailureCountDownLatch.await();
+            assertTrue(handleFailureCalledCount==1);
+            Map<HTableInterfaceReference, Collection<Mutation>> 
tableReferenceToMutations=tableReferenceToMutation.asMap();
+            assertEquals("tableReferenceToMutation.size()", 1, 
tableReferenceToMutations.size());
+            Iterator<Map.Entry<HTableInterfaceReference, 
Collection<Mutation>>> iter=tableReferenceToMutations.entrySet().iterator();
+            assertTrue(iter.hasNext());
+            Map.Entry<HTableInterfaceReference, Collection<Mutation>> 
entry=iter.next();
+            assertTrue(entry.getKey().getTableName().equals(INDEX_TABLE_NAME));
+            Mutation[] mutations=entry.getValue().toArray(new Mutation[0]);
+            assertEquals("mutations size "+mutations[0], 1, mutations.length);
+            assertTrue(mutations[0] instanceof Put);
+            
assertTrue(!Arrays.equals(mutations[0].getRow(),Bytes.toBytes("a")));
+
+            //wait for data table region repoen.
+            List<Region> dataTableRegions=null;
+
+            for(int i=1;i<=200;i++) {
+                
dataTableRegions=liveRegionServer.getOnlineRegions(TableName.valueOf(DATA_TABLE_NAME));
+                if(dataTableRegions.size() > 0) {
+                    break;
+                }
+                Thread.sleep(ONE_SEC);
+            }
+
+            
dataTableRegions=liveRegionServer.getOnlineRegions(TableName.valueOf(DATA_TABLE_NAME));
+            assertTrue(dataTableRegions.size()==1);
+
+
+            // the index table is one row
+            HTable indexTable = new HTable(getUtility().getConfiguration(), 
INDEX_TABLE_NAME);
+            resultScanner = indexTable.getScanner(scan);
+            count = 0;
+            for (Result result : resultScanner) {
+                count++;
+            }
+            assertEquals("Got an unexpected found of index rows", 1, count);
+            resultScanner.close();
+            indexTable.close();
+
+            scan = new Scan();
+            primaryTable.close();
+            primaryTable = new HTable(getUtility().getConfiguration(), 
DATA_TABLE_NAME);
+            primaryTable.getConnection().clearRegionCache();
+            resultScanner = primaryTable.getScanner(scan);
+            count = 0;
+            for (Result result : resultScanner) {
+                LOG.info("Got data table result:" + result);
+                count++;
+            }
+            assertEquals("Got an unexpected found of data rows", 1, count);
+
+            // cleanup
+            primaryTable.close();
+        }
+    }
+
+    private ServerName getRegionServerName(MiniHBaseCluster 
miniHBaseCluster,String tableName) throws IOException {
+        List<HRegion> regions = 
miniHBaseCluster.getRegions(Bytes.toBytes(tableName));
+        assertEquals(1, regions.size());
+        HRegion region=regions.get(0);
+        return 
miniHBaseCluster.getServerHoldingRegion(TableName.valueOf(tableName),region.getRegionInfo().getRegionName());
+    }
+
+    private void assertRegionServerDifferent(MiniHBaseCluster 
miniHBaseCluster) throws IOException {
+        ServerName dataTableRegionServerName=
+                this.getRegionServerName(miniHBaseCluster, DATA_TABLE_NAME);
+        ServerName indexTableRegionServerName=
+                this.getRegionServerName(miniHBaseCluster, INDEX_TABLE_NAME);
+        
assertTrue(!dataTableRegionServerName.equals(indexTableRegionServerName));
+    }
+
+    private void moveIndexTableRegionIfSameRegionSErver(MiniHBaseCluster 
miniHBaseCluster) throws IOException, InterruptedException {
+        List<HRegion> dataTableRegions = 
miniHBaseCluster.getRegions(Bytes.toBytes(DATA_TABLE_NAME));
+        assertEquals(1, dataTableRegions.size());
+        List<HRegion> indexTableRegions = 
miniHBaseCluster.getRegions(Bytes.toBytes(INDEX_TABLE_NAME));
+        assertEquals(1, indexTableRegions.size());
+
+        HRegion dataTableRegion=dataTableRegions.get(0);
+        HRegion indexTableRegion=indexTableRegions.get(0);
+        int dataTableRegionServerIndex = 
miniHBaseCluster.getServerWith(dataTableRegion.getRegionInfo().getRegionName());
+        int 
indexTableRegionServerIndex=miniHBaseCluster.getServerWith(indexTableRegion.getRegionInfo().getRegionName());
+        if(dataTableRegionServerIndex != indexTableRegionServerIndex) {
+            return;
+        }
+
+
+        int newRegionServerIndex=0;
+        while(newRegionServerIndex == indexTableRegionServerIndex) {
+            newRegionServerIndex++;
+        }
+
+        HRegionServer newRegionServer = 
miniHBaseCluster.getRegionServer(newRegionServerIndex);
+        this.moveRegionAndWait(miniHBaseCluster,indexTableRegion, 
newRegionServer);
+    }
+
+
+    private void moveRegionAndWait(MiniHBaseCluster miniHBaseCluster,HRegion 
destRegion, HRegionServer destRegionServer) throws IOException, 
InterruptedException {
+        HMaster master = miniHBaseCluster.getMaster();
+        getUtility().getHBaseAdmin().move(
+                destRegion.getRegionInfo().getEncodedNameAsBytes(),
+                
Bytes.toBytes(destRegionServer.getServerName().getServerName()));
+        while (true) {
+            ServerName currentRegionServerName =
+                    
master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
+            if (currentRegionServerName != null && 
currentRegionServerName.equals(destRegionServer.getServerName())) {
+                getUtility().assertRegionOnServer(
+                        destRegion.getRegionInfo(), currentRegionServerName, 
200);
+                break;
+            }
+            Thread.sleep(10);
+        }
+    }
+}

Reply via email to