Repository: hbase
Updated Branches:
  refs/heads/branch-1 b6a12c843 -> 432ca7e3f
  refs/heads/branch-1.3 ae6ff50dc -> 660427ce2
  refs/heads/branch-1.4 59312575c -> b6aa23eef
  refs/heads/branch-2 0501855bc -> a797fe0da
  refs/heads/master eb5e43673 -> 3df0351f2


HBASE-18771 Incorrect StoreFileRefresh leading to split and compaction failures

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3df0351f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3df0351f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3df0351f

Branch: refs/heads/master
Commit: 3df0351f2280bb914093d6fe3a69e246ae821617
Parents: eb5e436
Author: Abhishek Singh Chouhan <achou...@apache.org>
Authored: Tue Sep 12 16:00:47 2017 +0530
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Tue Sep 12 17:45:15 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HStore.java       |  16 +-
 .../TestCompactionFileNotFound.java             | 202 +++++++++++++++++++
 .../hadoop/hbase/regionserver/TestStore.java    |   4 +
 3 files changed, 218 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3df0351f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 11584ee..ec7fc01 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -618,16 +618,24 @@ public class HStore implements Store {
   private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) 
throws IOException {
     StoreFileManager sfm = storeEngine.getStoreFileManager();
     Collection<StoreFile> currentFiles = sfm.getStorefiles();
-    if (currentFiles == null) currentFiles = new ArrayList<>(0);
-
-    if (newFiles == null) newFiles = new ArrayList<>(0);
+    Collection<StoreFile> compactedFiles = sfm.getCompactedfiles();
+    if (currentFiles == null) currentFiles = Collections.emptySet();
+    if (newFiles == null) newFiles = Collections.emptySet();
+    if (compactedFiles == null) compactedFiles = Collections.emptySet();
 
     HashMap<StoreFileInfo, StoreFile> currentFilesSet = new 
HashMap<>(currentFiles.size());
     for (StoreFile sf : currentFiles) {
       currentFilesSet.put(sf.getFileInfo(), sf);
     }
-    HashSet<StoreFileInfo> newFilesSet = new HashSet<>(newFiles);
+    HashMap<StoreFileInfo, StoreFile> compactedFilesSet =
+        new HashMap<StoreFileInfo, StoreFile>(compactedFiles.size());
+    for (StoreFile sf : compactedFiles) {
+      compactedFilesSet.put(sf.getFileInfo(), sf);
+    }
 
+    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
+    // Exclude the files that have already been compacted
+    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
     Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, 
currentFilesSet.keySet());
     Set<StoreFileInfo> toBeRemovedFiles = 
Sets.difference(currentFilesSet.keySet(), newFilesSet);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3df0351f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionFileNotFound.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionFileNotFound.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionFileNotFound.java
new file mode 100644
index 0000000..a7d866b
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionFileNotFound.java
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * This class tests the scenario where a store refresh happens due to a file 
not found during scan,
+ * after a compaction but before the compacted files are archived. At this 
state we test for a split
+ * and compaction
+ */
+@Category(MediumTests.class)
+public class TestCompactionFileNotFound {
+  private static final Log LOG = 
LogFactory.getLog(TestCompactionFileNotFound.class);
+  private static final HBaseTestingUtility util = new HBaseTestingUtility();
+
+  private static final TableName TEST_TABLE = TableName.valueOf("test");
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+  private static final byte[] ROW_A = Bytes.toBytes("aaa");
+  private static final byte[] ROW_B = Bytes.toBytes("bbb");
+  private static final byte[] ROW_C = Bytes.toBytes("ccc");
+
+  private static final byte[] qualifierCol1 = Bytes.toBytes("col1");
+
+  private static final byte[] bytes1 = Bytes.toBytes(1);
+  private static final byte[] bytes2 = Bytes.toBytes(2);
+  private static final byte[] bytes3 = Bytes.toBytes(3);
+
+  private Table table;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hbase.hfile.compaction.discharger.interval",
+      Integer.MAX_VALUE);
+    util.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @After
+  public void after() throws Exception {
+    try {
+      if (table != null) {
+        table.close();
+      }
+    } finally {
+      util.deleteTable(TEST_TABLE);
+    }
+  }
+
+  @Test
+  public void testSplitAfterRefresh() throws Exception {
+    Admin admin = util.getAdmin();
+    table = util.createTable(TEST_TABLE, TEST_FAMILY);
+
+    try {
+      // Create Multiple store files
+      Put puta = new Put(ROW_A);
+      puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1);
+      table.put(puta);
+      admin.flush(TEST_TABLE);
+
+      Put putb = new Put(ROW_B);
+      putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2);
+      table.put(putb);
+      admin.flush(TEST_TABLE);
+
+      Put putc = new Put(ROW_C);
+      putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3);
+      table.put(putc);
+      admin.flush(TEST_TABLE);
+
+      admin.compact(TEST_TABLE);
+      while (admin.getCompactionState(TEST_TABLE) != CompactionState.NONE) {
+        Thread.sleep(1000);
+      }
+      table.put(putb);
+      HRegion hr1 = (HRegion) util.getRSForFirstRegionInTable(TEST_TABLE)
+          
.getRegionByEncodedName(admin.getTableRegions(TEST_TABLE).get(0).getEncodedName());
+      // Refresh store files post compaction, this should not open already 
compacted files
+      hr1.refreshStoreFiles(true);
+      int numRegionsBeforeSplit = admin.getTableRegions(TEST_TABLE).size();
+      // Check if we can successfully split after compaction
+      
admin.splitRegion(admin.getTableRegions(TEST_TABLE).get(0).getEncodedNameAsBytes(),
 ROW_C);
+      util.waitFor(20000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          int numRegionsAfterSplit = 0;
+          List<RegionServerThread> rst = 
util.getMiniHBaseCluster().getLiveRegionServerThreads();
+          for (RegionServerThread t : rst) {
+            numRegionsAfterSplit += 
t.getRegionServer().getOnlineRegions(TEST_TABLE).size();
+          }
+          // Make sure that the split went through and all the regions are 
assigned
+          return (numRegionsAfterSplit == numRegionsBeforeSplit + 1
+              && admin.isTableAvailable(TEST_TABLE));
+        }
+      });
+      // Split at this point should not result in the RS being aborted
+      
assertEquals(util.getMiniHBaseCluster().getLiveRegionServerThreads().size(), 3);
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+    }
+  }
+
+  @Test
+  public void testCompactionAfterRefresh() throws Exception {
+    Admin admin = util.getAdmin();
+    table = util.createTable(TEST_TABLE, TEST_FAMILY);
+    try {
+      // Create Multiple store files
+      Put puta = new Put(ROW_A);
+      puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1);
+      table.put(puta);
+      admin.flush(TEST_TABLE);
+
+      Put putb = new Put(ROW_B);
+      putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2);
+      table.put(putb);
+      admin.flush(TEST_TABLE);
+
+      Put putc = new Put(ROW_C);
+      putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3);
+      table.put(putc);
+      admin.flush(TEST_TABLE);
+
+      admin.compact(TEST_TABLE);
+      while (admin.getCompactionState(TEST_TABLE) != CompactionState.NONE) {
+        Thread.sleep(1000);
+      }
+      table.put(putb);
+      HRegion hr1 = (HRegion) util.getRSForFirstRegionInTable(TEST_TABLE)
+          
.getRegionByEncodedName(admin.getTableRegions(TEST_TABLE).get(0).getEncodedName());
+      // Refresh store files post compaction, this should not open already 
compacted files
+      hr1.refreshStoreFiles(true);
+      // Archive the store files and try another compaction to see if all is 
good
+      for (Store store : hr1.getStores()) {
+        store.closeAndArchiveCompactedFiles();
+      }
+      try {
+        hr1.compact(false);
+      } catch (IOException e) {
+        LOG.error("Got an exception during compaction", e);
+        if (e instanceof FileNotFoundException) {
+          Assert.fail("Got a FNFE during compaction");
+        } else {
+          Assert.fail();
+        }
+      }
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3df0351f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 537601d..3674303 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -913,6 +913,10 @@ public class TestStore {
 
     assertEquals(0, this.store.getStorefilesCount());
 
+    // Test refreshing store files when no store files are there
+    store.refreshStoreFiles();
+    assertEquals(0, this.store.getStorefilesCount());
+
     // add some data, flush
     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), null);
     flush(1);

Reply via email to