This is an automated email from the ASF dual-hosted git repository.

reidchan pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new 29080ed  HBASE-22890 Verify the file integrity in persistent IOEngine
29080ed is described below

commit 29080eda9859a3689910eab285a3e51481f772ff
Author: zbq.dean <zbq.d...@gmail.com>
AuthorDate: Fri Sep 20 14:09:34 2019 +0800

    HBASE-22890 Verify the file integrity in persistent IOEngine
    
    Signed-off-by Anoop Sam John <anoopsamj...@apache.org>
    Signed-off-by stack <st...@apache.org>
    Signed-off-by Reid Chan <reidc...@apache.org>
---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  81 ++++--
 .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java |  88 ++++++-
 .../hbase/io/hfile/bucket/PersistentIOEngine.java  |  44 ++++
 .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 282 +++++++++++++++++++++
 4 files changed, 473 insertions(+), 22 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 5a4ac13..5c02166 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -242,6 +244,16 @@ public class BucketCache implements BlockCache, HeapSize {
   /** In-memory bucket size */
   private float memoryFactor;
 
+  private static final String FILE_VERIFY_ALGORITHM =
+    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
+  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
+
+  /**
+   * Use {@link java.security.MessageDigest} class's encryption algorithms to 
check
+   * persistent file integrity, default algorithm is MD5
+   * */
+  private String algorithm;
+
   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] 
bucketSizes,
       int writerThreadNum, int writerQLen, String persistencePath) throws 
FileNotFoundException,
       IOException {
@@ -252,8 +264,9 @@ public class BucketCache implements BlockCache, HeapSize {
   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] 
bucketSizes,
                      int writerThreadNum, int writerQLen, String 
persistencePath, int ioErrorsTolerationDuration,
                      Configuration conf)
-      throws FileNotFoundException, IOException {
-    this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
+      throws IOException {
+    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, 
DEFAULT_FILE_VERIFY_ALGORITHM);
+    ioEngine = getIOEngineFromName(ioEngineName, capacity);
     this.writerThreads = new WriterThread[writerThreadNum];
     long blockNumCapacity = capacity / blockSize;
     if (blockNumCapacity >= Integer.MAX_VALUE) {
@@ -295,7 +308,7 @@ public class BucketCache implements BlockCache, HeapSize {
       } catch (IOException ioex) {
         LOG.error("Can't restore from file because of", ioex);
       } catch (ClassNotFoundException cnfe) {
-        LOG.error("Can't restore from file in rebuild because can't 
deserialise",cnfe);
+        LOG.error("Can't restore from file in rebuild because can't 
deserialise", cnfe);
         throw new RuntimeException(cnfe);
       }
     }
@@ -1021,41 +1034,69 @@ public class BucketCache implements BlockCache, 
HeapSize {
 
   private void persistToFile() throws IOException {
     assert !cacheEnabled;
-    FileOutputStream fos = null;
-    ObjectOutputStream oos = null;
-    try {
+    try (ObjectOutputStream oos = new ObjectOutputStream(
+      new FileOutputStream(persistencePath, false))){
       if (!ioEngine.isPersistent()) {
         throw new IOException("Attempt to persist non-persistent cache 
mappings!");
       }
-      fos = new FileOutputStream(persistencePath, false);
-      oos = new ObjectOutputStream(fos);
+      byte[] checksum = ((PersistentIOEngine) 
ioEngine).calculateChecksum(algorithm);
+      if (checksum != null) {
+        oos.write(ProtobufUtil.PB_MAGIC);
+        oos.writeInt(checksum.length);
+        oos.write(checksum);
+      }
       oos.writeLong(cacheCapacity);
       oos.writeUTF(ioEngine.getClass().getName());
       oos.writeUTF(backingMap.getClass().getName());
       oos.writeObject(deserialiserMap);
       oos.writeObject(backingMap);
-    } finally {
-      if (oos != null) oos.close();
-      if (fos != null) fos.close();
     }
   }
 
   @SuppressWarnings("unchecked")
-  private void retrieveFromFile(int[] bucketSizes) throws IOException, 
BucketAllocatorException,
+  private void retrieveFromFile(int[] bucketSizes) throws IOException,
       ClassNotFoundException {
     File persistenceFile = new File(persistencePath);
     if (!persistenceFile.exists()) {
       return;
     }
     assert !cacheEnabled;
-    FileInputStream fis = null;
     ObjectInputStream ois = null;
     try {
       if (!ioEngine.isPersistent())
         throw new IOException(
             "Attempt to restore non-persistent cache mappings!");
-      fis = new FileInputStream(persistencePath);
-      ois = new ObjectInputStream(fis);
+      ois = new ObjectInputStream(new FileInputStream(persistencePath));
+      int pblen = ProtobufUtil.lengthOfPBMagic();
+      byte[] pbuf = new byte[pblen];
+      int read = ois.read(pbuf);
+      if (read != pblen) {
+        LOG.warn("Can't restore from file because of incorrect number of bytes 
read while " +
+          "checking for protobuf magic number. Requested=" + pblen + ", but 
received= " +
+          read + ".");
+        return;
+      }
+      if (Bytes.equals(ProtobufUtil.PB_MAGIC, pbuf)) {
+        int length = ois.readInt();
+        byte[] persistentChecksum = new byte[length];
+        int readLen = ois.read(persistentChecksum);
+        if (readLen != length) {
+          LOG.warn("Can't restore from file because of incorrect number of 
bytes read while " +
+            "checking for persistent checksum. Requested=" + length + ", but 
received=" +
+            readLen + ". ");
+          return;
+        }
+        if (!((PersistentIOEngine) ioEngine).verifyFileIntegrity(
+            persistentChecksum, algorithm)) {
+          LOG.warn("Can't restore from file because of verification failed.");
+          return;
+        }
+      } else {
+        // persistent file may be an old version of file, it's not support 
verification,
+        // so reopen ObjectInputStream and read the persistent file from head
+        ois.close();
+        ois = new ObjectInputStream(new FileInputStream(persistencePath));
+      }
       long capacitySize = ois.readLong();
       if (capacitySize != cacheCapacity)
         throw new IOException("Mismatched cache capacity:"
@@ -1079,8 +1120,9 @@ public class BucketCache implements BlockCache, HeapSize {
       deserialiserMap = deserMap;
       backingMap = backingMapFromFile;
     } finally {
-      if (ois != null) ois.close();
-      if (fis != null) fis.close();
+      if (ois != null) {
+        ois.close();
+      }
       if (!persistenceFile.delete()) {
         throw new IOException("Failed deleting persistence file "
             + persistenceFile.getAbsolutePath());
@@ -1598,4 +1640,9 @@ public class BucketCache implements BlockCache, HeapSize {
   float getMemoryFactor() {
     return memoryFactor;
   }
+
+  @VisibleForTesting
+  public UniqueIndexMap<Integer> getDeserialiserMap() {
+    return deserialiserMap;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 7d3a9fa..f631836 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -32,15 +34,19 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 /**
  * IO engine that stores data to a file on the local file system.
  */
 @InterfaceAudience.Private
-public class FileIOEngine implements IOEngine {
+public class FileIOEngine implements PersistentIOEngine {
   private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
   public static final String FILE_DELIMITER = ",";
+  private static final DuFileCommand DU = new DuFileCommand(new String[] 
{"du", ""});
+
   private final String[] filePaths;
   private final FileChannel[] fileChannels;
   private final RandomAccessFile[] rafs;
@@ -68,15 +74,20 @@ public class FileIOEngine implements IOEngine {
           // The next setting length will throw exception,logging this message
           // is just used for the detail reason of exception,
           String msg = "Only " + StringUtils.byteDesc(totalSpace)
-              + " total space under " + filePath + ", not enough for requested 
"
-              + StringUtils.byteDesc(sizePerFile);
+            + " total space under " + filePath + ", not enough for requested "
+            + StringUtils.byteDesc(sizePerFile);
           LOG.warn(msg);
         }
-        rafs[i].setLength(sizePerFile);
+        File file = new File(filePath);
+        // setLength() method will change file's last modified time. So if 
don't do
+        // this check, wrong time will be used when calculating checksum.
+        if (file.length() != sizePerFile) {
+          rafs[i].setLength(sizePerFile);
+        }
         fileChannels[i] = rafs[i].getChannel();
         channelLocks[i] = new ReentrantLock();
         LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
-            + ", on the path:" + filePath);
+          + ", on the path: " + filePath);
       } catch (IOException fex) {
         LOG.error("Failed allocating cache on " + filePath, fex);
         shutdown();
@@ -86,6 +97,18 @@ public class FileIOEngine implements IOEngine {
   }
 
   @Override
+  public boolean verifyFileIntegrity(byte[] persistentChecksum, String 
algorithm) {
+    byte[] calculateChecksum = calculateChecksum(algorithm);
+    if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
+      LOG.error("Mismatch of checksum! The persistent checksum is " +
+        Bytes.toString(persistentChecksum) + ", but the calculate checksum is 
" +
+        Bytes.toString(calculateChecksum));
+      return false;
+    }
+    return true;
+  }
+
+  @Override
   public String toString() {
     return "ioengine=" + this.getClass().getSimpleName() + ", paths="
         + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", 
this.capacity);
@@ -267,6 +290,61 @@ public class FileIOEngine implements IOEngine {
     }
   }
 
+  @Override
+  public byte[] calculateChecksum(String algorithm) {
+    if (filePaths == null) {
+      return null;
+    }
+    try {
+      StringBuilder sb = new StringBuilder();
+      for (String filePath : filePaths){
+        File file = new File(filePath);
+        sb.append(filePath);
+        sb.append(getFileSize(filePath));
+        sb.append(file.lastModified());
+      }
+      MessageDigest messageDigest = MessageDigest.getInstance(algorithm);
+      messageDigest.update(Bytes.toBytes(sb.toString()));
+      return messageDigest.digest();
+    } catch (IOException ioex) {
+      LOG.error("Calculating checksum failed.", ioex);
+      return null;
+    } catch (NoSuchAlgorithmException e) {
+      LOG.error("No such algorithm : " + algorithm + "!");
+      return null;
+    }
+  }
+
+  /**
+   * Using Linux command du to get file's real size
+   * @param filePath the file
+   * @return file's real size
+   * @throws IOException something happened like file not exists
+   */
+  private static long getFileSize(String filePath) throws IOException {
+    DU.setExecCommand(filePath);
+    DU.execute();
+    return Long.parseLong(DU.getOutput().split("\t")[0]);
+  }
+
+  private static class DuFileCommand extends Shell.ShellCommandExecutor {
+    private String[] execCommand;
+
+    DuFileCommand(String[] execString) {
+      super(execString);
+      execCommand = execString;
+    }
+
+    void setExecCommand(String filePath) {
+      this.execCommand[1] = filePath;
+    }
+
+    @Override
+    public String[] getExecString() {
+      return this.execCommand;
+    }
+  }
+
   private static interface FileAccessor {
     int access(FileChannel fileChannel, ByteBuffer byteBuffer, long 
accessOffset)
         throws IOException;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
new file mode 100644
index 0000000..5886c8b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A class implementing PersistentIOEngine interface supports persistent and 
file integrity verify
+ * for {@link BucketCache}
+ */
+@InterfaceAudience.Private
+public interface PersistentIOEngine extends IOEngine {
+
+  /**
+   * Using an encryption algorithm to calculate a checksum, the default 
encryption algorithm is MD5
+   * @param algorithm which algorithm to calculate checksum
+   * @return the checksum which is convert to HexString
+   */
+  byte[] calculateChecksum(String algorithm);
+
+  /**
+   * Verify cache files's integrity
+   * @param persistentChecksum the persistent checksum
+   * @param algorithm which algorithm to calculate checksum
+   * @return true if verify successfully
+   */
+  boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm);
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
new file mode 100644
index 0000000..f86df96
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -0,0 +1,282 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Basic test for check file's integrity when BucketCache retrieve from file
+ */
+@Category(SmallTests.class)
+public class TestVerifyBucketCacheFile {
+  final int constructedBlockSize = 8 * 1024;
+  final long capacitySize = 32 * 1024 * 1024;
+  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
+  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
+
+  /**
+   * Test cache file or persistence file does not exist whether BucketCache 
starts normally
+   * (1) Start BucketCache and add some blocks, then shutdown BucketCache and 
persist cache
+   * to file. Restart BucketCache and it can restore cache from file.
+   * (2) Delete bucket cache file after shutdown BucketCache. Restart 
BucketCache and it can't
+   * restore cache from file, the cache file and persistence file would be 
deleted before
+   * BucketCache start normally.
+   * (3) Delete persistence file after shutdown BucketCache. Restart 
BucketCache and it can't
+   * restore cache from file, the cache file and persistence file would be 
deleted before
+   * BucketCache start normally.
+   * @throws Exception the exception
+   */
+  @Test
+  public void testRetrieveFromFile() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+    BucketCache bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    long usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize == 0);
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+    // Add blocks
+    for (CacheTestUtils.HFileBlockPair block : blocks) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    }
+    usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize != 0);
+    // 1.persist cache to file
+    bucketCache.shutdown();
+    // restore cache from file
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+    // persist cache to file
+    bucketCache.shutdown();
+
+    // 2.delete bucket cache file
+    File cacheFile = new File(testDir + "/bucket.cache");
+    assertTrue(cacheFile.delete());
+    // can't restore cache from file
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    assertEquals(0, bucketCache.getAllocator().getUsedSize());
+    assertEquals(0, bucketCache.backingMap.size());
+    // Add blocks
+    for (CacheTestUtils.HFileBlockPair block : blocks) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    }
+    usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize != 0);
+    // persist cache to file
+    bucketCache.shutdown();
+
+    // 3.delete backingMap persistence file
+    File mapFile = new File(testDir + "/bucket.persistence");
+    assertTrue(mapFile.delete());
+    // can't restore cache from file
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    assertEquals(0, bucketCache.getAllocator().getUsedSize());
+    assertEquals(0, bucketCache.backingMap.size());
+
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  /**
+   * Test whether BucketCache is started normally after modifying the cache 
file.
+   * Start BucketCache and add some blocks, then shutdown BucketCache and 
persist cache to file.
+   * Restart BucketCache after modify cache file's data, and it can't restore 
cache from file,
+   * the cache file and persistence file would be deleted before BucketCache 
start normally.
+   * @throws Exception the exception
+   */
+  @Test
+  public void testModifiedBucketCacheFileData() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+    BucketCache bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    long usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize == 0);
+
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+    // Add blocks
+    for (CacheTestUtils.HFileBlockPair block : blocks) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    }
+    usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize != 0);
+    // persist cache to file
+    bucketCache.shutdown();
+
+    // modified bucket cache file
+    String file = testDir + "/bucket.cache";
+    try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
+      new FileOutputStream(file, false)))) {
+      out.write("test bucket cache");
+    }
+
+    // can't restore cache from file
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    assertEquals(0, bucketCache.getAllocator().getUsedSize());
+    assertEquals(0, bucketCache.backingMap.size());
+
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  /**
+   * Test whether BucketCache is started normally after modifying the cache 
file's last modified
+   * time. First Start BucketCache and add some blocks, then shutdown 
BucketCache and persist
+   * cache to file. Then Restart BucketCache after modify cache file's last 
modified time, and
+   * it can't restore cache from file, the cache file and persistence file 
would be deleted
+   * before BucketCache start normally.
+   * @throws Exception the exception
+   */
+  @Test
+  public void testModifiedBucketCacheFileTime() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+    BucketCache bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    long usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize == 0);
+
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+    // Add blocks
+    for (CacheTestUtils.HFileBlockPair block : blocks) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    }
+    usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize != 0);
+    // persist cache to file
+    bucketCache.shutdown();
+
+    // modified bucket cache file LastModifiedTime
+    File file = new File(testDir + "/bucket.cache");
+    assertTrue(file.setLastModified(System.currentTimeMillis() + 1000));
+
+    // can't restore cache from file
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+    assertEquals(0, bucketCache.getAllocator().getUsedSize());
+    assertEquals(0, bucketCache.backingMap.size());
+
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  /**
+   * Test whether it can read the old version's persistence file, it's for 
backward compatibility.
+   * Start BucketCache and add some blocks, then persist cache to file in old 
way and shutdown
+   * BucketCache. Restart BucketCache, and it can normally restore from old 
version persistence
+   * file.
+   * @throws Exception the exception
+   */
+  @Test
+  public void compatibilityTest() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+    String persistencePath = testDir + "/bucket.persistence";
+    BucketCache bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, persistencePath);
+    long usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize == 0);
+
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+    // Add blocks
+    for (CacheTestUtils.HFileBlockPair block : blocks) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    }
+    usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize != 0);
+    // persistence backingMap using old way
+    persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(),
+      bucketCache.backingMap, bucketCache.getDeserialiserMap());
+    bucketCache.shutdown();
+
+    // restore cache from file which skip check checksum
+    bucketCache =
+      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
+        null, writeThreads, writerQLen, persistencePath + ".old");
+    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+    assertEquals(blocks.length, bucketCache.backingMap.size());
+  }
+
+  private void persistToFileInOldWay(String persistencePath, long 
cacheCapacity,
+    ConcurrentMap backingMap, UniqueIndexMap deserialiserMap)
+    throws IOException {
+    try(ObjectOutputStream oos = new ObjectOutputStream(
+      new FileOutputStream(persistencePath, false))) {
+      oos.writeLong(cacheCapacity);
+      oos.writeUTF(FileIOEngine.class.getName());
+      oos.writeUTF(backingMap.getClass().getName());
+      oos.writeObject(deserialiserMap);
+      oos.writeObject(backingMap);
+    }
+  }
+
+  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey 
cacheKey)
+    throws InterruptedException {
+    while (!cache.backingMap.containsKey(cacheKey) || 
cache.ramCache.containsKey(cacheKey)) {
+      Thread.sleep(100);
+    }
+  }
+
+  // BucketCache.cacheBlock is async, it first adds block to ramCache and 
writeQueue, then writer
+  // threads will flush it to the bucket and put reference entry in backingMap.
+  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, 
BlockCacheKey cacheKey,
+    Cacheable block) throws InterruptedException {
+    cache.cacheBlock(cacheKey, block);
+    waitUntilFlushedToBucket(cache, cacheKey);
+  }
+}

Reply via email to