This is an automated email from the ASF dual-hosted git repository. reidchan pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push: new 29080ed HBASE-22890 Verify the file integrity in persistent IOEngine 29080ed is described below commit 29080eda9859a3689910eab285a3e51481f772ff Author: zbq.dean <zbq.d...@gmail.com> AuthorDate: Fri Sep 20 14:09:34 2019 +0800 HBASE-22890 Verify the file integrity in persistent IOEngine Signed-off-by Anoop Sam John <anoopsamj...@apache.org> Signed-off-by stack <st...@apache.org> Signed-off-by Reid Chan <reidc...@apache.org> --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 81 ++++-- .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 88 ++++++- .../hbase/io/hfile/bucket/PersistentIOEngine.java | 44 ++++ .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 282 +++++++++++++++++++++ 4 files changed, 473 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5a4ac13..5c02166 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -69,6 +69,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -242,6 +244,16 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; + private static final String FILE_VERIFY_ALGORITHM = + "hbase.bucketcache.persistent.file.integrity.check.algorithm"; + private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + + /** + * Use {@link java.security.MessageDigest} class's encryption algorithms to check + * persistent file integrity, default algorithm is MD5 + * */ + private String algorithm; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { @@ -252,8 +264,9 @@ public class BucketCache implements BlockCache, HeapSize { public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) - throws FileNotFoundException, IOException { - this.ioEngine = getIOEngineFromName(ioEngineName, capacity); + throws IOException { + this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); + ioEngine = getIOEngineFromName(ioEngineName, capacity); this.writerThreads = new WriterThread[writerThreadNum]; long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { @@ -295,7 +308,7 @@ public class BucketCache implements BlockCache, HeapSize { } catch (IOException ioex) { LOG.error("Can't restore from file because of", ioex); } catch (ClassNotFoundException cnfe) { - LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); + LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe); throw new RuntimeException(cnfe); } } @@ -1021,41 +1034,69 @@ public class BucketCache implements BlockCache, HeapSize { private void persistToFile() throws IOException { assert !cacheEnabled; - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { + try (ObjectOutputStream oos = new ObjectOutputStream( + new FileOutputStream(persistencePath, false))){ if (!ioEngine.isPersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } - fos = new FileOutputStream(persistencePath, false); - oos = new ObjectOutputStream(fos); + byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(algorithm); + if (checksum != null) { + oos.write(ProtobufUtil.PB_MAGIC); + oos.writeInt(checksum.length); + oos.write(checksum); + } oos.writeLong(cacheCapacity); oos.writeUTF(ioEngine.getClass().getName()); oos.writeUTF(backingMap.getClass().getName()); oos.writeObject(deserialiserMap); oos.writeObject(backingMap); - } finally { - if (oos != null) oos.close(); - if (fos != null) fos.close(); } } @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, + private void retrieveFromFile(int[] bucketSizes) throws IOException, ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; } assert !cacheEnabled; - FileInputStream fis = null; ObjectInputStream ois = null; try { if (!ioEngine.isPersistent()) throw new IOException( "Attempt to restore non-persistent cache mappings!"); - fis = new FileInputStream(persistencePath); - ois = new ObjectInputStream(fis); + ois = new ObjectInputStream(new FileInputStream(persistencePath)); + int pblen = ProtobufUtil.lengthOfPBMagic(); + byte[] pbuf = new byte[pblen]; + int read = ois.read(pbuf); + if (read != pblen) { + LOG.warn("Can't restore from file because of incorrect number of bytes read while " + + "checking for protobuf magic number. Requested=" + pblen + ", but received= " + + read + "."); + return; + } + if (Bytes.equals(ProtobufUtil.PB_MAGIC, pbuf)) { + int length = ois.readInt(); + byte[] persistentChecksum = new byte[length]; + int readLen = ois.read(persistentChecksum); + if (readLen != length) { + LOG.warn("Can't restore from file because of incorrect number of bytes read while " + + "checking for persistent checksum. Requested=" + length + ", but received=" + + readLen + ". "); + return; + } + if (!((PersistentIOEngine) ioEngine).verifyFileIntegrity( + persistentChecksum, algorithm)) { + LOG.warn("Can't restore from file because of verification failed."); + return; + } + } else { + // persistent file may be an old version of file, it's not support verification, + // so reopen ObjectInputStream and read the persistent file from head + ois.close(); + ois = new ObjectInputStream(new FileInputStream(persistencePath)); + } long capacitySize = ois.readLong(); if (capacitySize != cacheCapacity) throw new IOException("Mismatched cache capacity:" @@ -1079,8 +1120,9 @@ public class BucketCache implements BlockCache, HeapSize { deserialiserMap = deserMap; backingMap = backingMapFromFile; } finally { - if (ois != null) ois.close(); - if (fis != null) fis.close(); + if (ois != null) { + ois.close(); + } if (!persistenceFile.delete()) { throw new IOException("Failed deleting persistence file " + persistenceFile.getAbsolutePath()); @@ -1598,4 +1640,9 @@ public class BucketCache implements BlockCache, HeapSize { float getMemoryFactor() { return memoryFactor; } + + @VisibleForTesting + public UniqueIndexMap<Integer> getDeserialiserMap() { + return deserialiserMap; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 7d3a9fa..f631836 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -25,6 +25,8 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.concurrent.locks.ReentrantLock; @@ -32,15 +34,19 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; /** * IO engine that stores data to a file on the local file system. */ @InterfaceAudience.Private -public class FileIOEngine implements IOEngine { +public class FileIOEngine implements PersistentIOEngine { private static final Log LOG = LogFactory.getLog(FileIOEngine.class); public static final String FILE_DELIMITER = ","; + private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""}); + private final String[] filePaths; private final FileChannel[] fileChannels; private final RandomAccessFile[] rafs; @@ -68,15 +74,20 @@ public class FileIOEngine implements IOEngine { // The next setting length will throw exception,logging this message // is just used for the detail reason of exception, String msg = "Only " + StringUtils.byteDesc(totalSpace) - + " total space under " + filePath + ", not enough for requested " - + StringUtils.byteDesc(sizePerFile); + + " total space under " + filePath + ", not enough for requested " + + StringUtils.byteDesc(sizePerFile); LOG.warn(msg); } - rafs[i].setLength(sizePerFile); + File file = new File(filePath); + // setLength() method will change file's last modified time. So if don't do + // this check, wrong time will be used when calculating checksum. + if (file.length() != sizePerFile) { + rafs[i].setLength(sizePerFile); + } fileChannels[i] = rafs[i].getChannel(); channelLocks[i] = new ReentrantLock(); LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) - + ", on the path:" + filePath); + + ", on the path: " + filePath); } catch (IOException fex) { LOG.error("Failed allocating cache on " + filePath, fex); shutdown(); @@ -86,6 +97,18 @@ public class FileIOEngine implements IOEngine { } @Override + public boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm) { + byte[] calculateChecksum = calculateChecksum(algorithm); + if (!Bytes.equals(persistentChecksum, calculateChecksum)) { + LOG.error("Mismatch of checksum! The persistent checksum is " + + Bytes.toString(persistentChecksum) + ", but the calculate checksum is " + + Bytes.toString(calculateChecksum)); + return false; + } + return true; + } + + @Override public String toString() { return "ioengine=" + this.getClass().getSimpleName() + ", paths=" + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity); @@ -267,6 +290,61 @@ public class FileIOEngine implements IOEngine { } } + @Override + public byte[] calculateChecksum(String algorithm) { + if (filePaths == null) { + return null; + } + try { + StringBuilder sb = new StringBuilder(); + for (String filePath : filePaths){ + File file = new File(filePath); + sb.append(filePath); + sb.append(getFileSize(filePath)); + sb.append(file.lastModified()); + } + MessageDigest messageDigest = MessageDigest.getInstance(algorithm); + messageDigest.update(Bytes.toBytes(sb.toString())); + return messageDigest.digest(); + } catch (IOException ioex) { + LOG.error("Calculating checksum failed.", ioex); + return null; + } catch (NoSuchAlgorithmException e) { + LOG.error("No such algorithm : " + algorithm + "!"); + return null; + } + } + + /** + * Using Linux command du to get file's real size + * @param filePath the file + * @return file's real size + * @throws IOException something happened like file not exists + */ + private static long getFileSize(String filePath) throws IOException { + DU.setExecCommand(filePath); + DU.execute(); + return Long.parseLong(DU.getOutput().split("\t")[0]); + } + + private static class DuFileCommand extends Shell.ShellCommandExecutor { + private String[] execCommand; + + DuFileCommand(String[] execString) { + super(execString); + execCommand = execString; + } + + void setExecCommand(String filePath) { + this.execCommand[1] = filePath; + } + + @Override + public String[] getExecString() { + return this.execCommand; + } + } + private static interface FileAccessor { int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java new file mode 100644 index 0000000..5886c8b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java @@ -0,0 +1,44 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A class implementing PersistentIOEngine interface supports persistent and file integrity verify + * for {@link BucketCache} + */ +@InterfaceAudience.Private +public interface PersistentIOEngine extends IOEngine { + + /** + * Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5 + * @param algorithm which algorithm to calculate checksum + * @return the checksum which is convert to HexString + */ + byte[] calculateChecksum(String algorithm); + + /** + * Verify cache files's integrity + * @param persistentChecksum the persistent checksum + * @param algorithm which algorithm to calculate checksum + * @return true if verify successfully + */ + boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java new file mode 100644 index 0000000..f86df96 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -0,0 +1,282 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStreamWriter; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Basic test for check file's integrity when BucketCache retrieve from file + */ +@Category(SmallTests.class) +public class TestVerifyBucketCacheFile { + final int constructedBlockSize = 8 * 1024; + final long capacitySize = 32 * 1024 * 1024; + final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; + final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; + + /** + * Test cache file or persistence file does not exist whether BucketCache starts normally + * (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache + * to file. Restart BucketCache and it can restore cache from file. + * (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't + * restore cache from file, the cache file and persistence file would be deleted before + * BucketCache start normally. + * (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't + * restore cache from file, the cache file and persistence file would be deleted before + * BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testRetrieveFromFile() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // 1.persist cache to file + bucketCache.shutdown(); + // restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + // persist cache to file + bucketCache.shutdown(); + + // 2.delete bucket cache file + File cacheFile = new File(testDir + "/bucket.cache"); + assertTrue(cacheFile.delete()); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // 3.delete backingMap persistence file + File mapFile = new File(testDir + "/bucket.persistence"); + assertTrue(mapFile.delete()); + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether BucketCache is started normally after modifying the cache file. + * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. + * Restart BucketCache after modify cache file's data, and it can't restore cache from file, + * the cache file and persistence file would be deleted before BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testModifiedBucketCacheFileData() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // modified bucket cache file + String file = testDir + "/bucket.cache"; + try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(file, false)))) { + out.write("test bucket cache"); + } + + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether BucketCache is started normally after modifying the cache file's last modified + * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist + * cache to file. Then Restart BucketCache after modify cache file's last modified time, and + * it can't restore cache from file, the cache file and persistence file would be deleted + * before BucketCache start normally. + * @throws Exception the exception + */ + @Test + public void testModifiedBucketCacheFileTime() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // modified bucket cache file LastModifiedTime + File file = new File(testDir + "/bucket.cache"); + assertTrue(file.setLastModified(System.currentTimeMillis() + 1000)); + + // can't restore cache from file + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } + + /** + * Test whether it can read the old version's persistence file, it's for backward compatibility. + * Start BucketCache and add some blocks, then persist cache to file in old way and shutdown + * BucketCache. Restart BucketCache, and it can normally restore from old version persistence + * file. + * @throws Exception the exception + */ + @Test + public void compatibilityTest() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + String persistencePath = testDir + "/bucket.persistence"; + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, persistencePath); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (CacheTestUtils.HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persistence backingMap using old way + persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(), + bucketCache.backingMap, bucketCache.getDeserialiserMap()); + bucketCache.shutdown(); + + // restore cache from file which skip check checksum + bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, + null, writeThreads, writerQLen, persistencePath + ".old"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + assertEquals(blocks.length, bucketCache.backingMap.size()); + } + + private void persistToFileInOldWay(String persistencePath, long cacheCapacity, + ConcurrentMap backingMap, UniqueIndexMap deserialiserMap) + throws IOException { + try(ObjectOutputStream oos = new ObjectOutputStream( + new FileOutputStream(persistencePath, false))) { + oos.writeLong(cacheCapacity); + oos.writeUTF(FileIOEngine.class.getName()); + oos.writeUTF(backingMap.getClass().getName()); + oos.writeObject(deserialiserMap); + oos.writeObject(backingMap); + } + } + + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) + throws InterruptedException { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + } + } + + // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer + // threads will flush it to the bucket and put reference entry in backingMap. + private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, + Cacheable block) throws InterruptedException { + cache.cacheBlock(cacheKey, block); + waitUntilFlushedToBucket(cache, cacheKey); + } +}