This is an automated email from the ASF dual-hosted git repository. reidchan pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push: new a31cd0c Revert "HBASE-22890 Verify the files when RegionServer is starting and BucketCache is in file mode" a31cd0c is described below commit a31cd0c0df9705db41551fe3403d5aef0da20033 Author: Reid Chan <reidc...@apache.org> AuthorDate: Mon Sep 16 17:50:57 2019 +0800 Revert "HBASE-22890 Verify the files when RegionServer is starting and BucketCache is in file mode" Reason: There're still some concerns on whether to delete cached data file. This reverts commit 5bf60ec55fdf637d80492b61e6e6d8d605c5ef4a. --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 75 ++---- .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 152 +---------- .../hbase/io/hfile/bucket/PersistentIOEngine.java | 59 ---- .../hbase/io/hfile/bucket/TestFileIOEngine.java | 2 +- .../io/hfile/bucket/TestVerifyBucketCacheFile.java | 297 --------------------- 5 files changed, 32 insertions(+), 553 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index af10f2e..5a4ac13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -29,7 +29,6 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -244,17 +242,6 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; - private String ioEngineName; - private static final String FILE_VERIFY_ALGORITHM = - "hbase.bucketcache.persistent.file.integrity.check.algorithm"; - private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; - - /** - * Use {@link java.security.MessageDigest} class's encryption algorithms to check - * persistent file integrity, default algorithm is MD5 - * */ - private String algorithm; - public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { @@ -265,7 +252,8 @@ public class BucketCache implements BlockCache, HeapSize { public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) - throws IOException { + throws FileNotFoundException, IOException { + this.ioEngine = getIOEngineFromName(ioEngineName, capacity); this.writerThreads = new WriterThread[writerThreadNum]; long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { @@ -287,7 +275,6 @@ public class BucketCache implements BlockCache, HeapSize { ", memoryFactor: " + memoryFactor); this.cacheCapacity = capacity; - this.ioEngineName = ioEngineName; this.persistencePath = persistencePath; this.blockSize = blockSize; this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; @@ -301,15 +288,14 @@ public class BucketCache implements BlockCache, HeapSize { this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>(); this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity); - this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); - ioEngine = getIOEngineFromName(); + if (ioEngine.isPersistent() && persistencePath != null) { try { retrieveFromFile(bucketSizes); } catch (IOException ioex) { LOG.error("Can't restore from file because of", ioex); } catch (ClassNotFoundException cnfe) { - LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe); + LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); throw new RuntimeException(cnfe); } } @@ -373,10 +359,12 @@ public class BucketCache implements BlockCache, HeapSize { /** * Get the IOEngine from the IO engine name + * @param ioEngineName + * @param capacity * @return the IOEngine * @throws IOException */ - private IOEngine getIOEngineFromName() + private IOEngine getIOEngineFromName(String ioEngineName, long capacity) throws IOException { if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { // In order to make the usage simple, we only need the prefix 'files:' in @@ -384,11 +372,11 @@ public class BucketCache implements BlockCache, HeapSize { // the compatibility String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); - return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths); + return new FileIOEngine(capacity, filePaths); } else if (ioEngineName.startsWith("offheap")) - return new ByteBufferIOEngine(cacheCapacity, true); + return new ByteBufferIOEngine(capacity, true); else if (ioEngineName.startsWith("heap")) - return new ByteBufferIOEngine(cacheCapacity, false); + return new ByteBufferIOEngine(capacity, false); else throw new IllegalArgumentException( "Don't understand io engine name for cache - prefix with file:, heap or offheap"); @@ -1033,48 +1021,41 @@ public class BucketCache implements BlockCache, HeapSize { private void persistToFile() throws IOException { assert !cacheEnabled; - try (ObjectOutputStream oos = new ObjectOutputStream( - new FileOutputStream(persistencePath, false))){ + FileOutputStream fos = null; + ObjectOutputStream oos = null; + try { if (!ioEngine.isPersistent()) { throw new IOException("Attempt to persist non-persistent cache mappings!"); } - if (ioEngine instanceof PersistentIOEngine) { - oos.write(ProtobufUtil.PB_MAGIC); - byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(); - oos.writeInt(checksum.length); - oos.write(checksum); - } + fos = new FileOutputStream(persistencePath, false); + oos = new ObjectOutputStream(fos); oos.writeLong(cacheCapacity); oos.writeUTF(ioEngine.getClass().getName()); oos.writeUTF(backingMap.getClass().getName()); oos.writeObject(deserialiserMap); oos.writeObject(backingMap); - } catch (NoSuchAlgorithmException e) { - LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e); + } finally { + if (oos != null) oos.close(); + if (fos != null) fos.close(); } } @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, + private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; } assert !cacheEnabled; - try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){ + FileInputStream fis = null; + ObjectInputStream ois = null; + try { if (!ioEngine.isPersistent()) throw new IOException( "Attempt to restore non-persistent cache mappings!"); - // for backward compatibility - if (ioEngine instanceof PersistentIOEngine && - !((PersistentIOEngine) ioEngine).isOldVersion()) { - byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length]; - ois.read(PBMagic); - int length = ois.readInt(); - byte[] persistenceChecksum = new byte[length]; - ois.read(persistenceChecksum); - } + fis = new FileInputStream(persistencePath); + ois = new ObjectInputStream(fis); long capacitySize = ois.readLong(); if (capacitySize != cacheCapacity) throw new IOException("Mismatched cache capacity:" @@ -1097,8 +1078,9 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator = allocator; deserialiserMap = deserMap; backingMap = backingMapFromFile; - blockNumber.set(backingMap.size()); } finally { + if (ois != null) ois.close(); + if (fis != null) fis.close(); if (!persistenceFile.delete()) { throw new IOException("Failed deleting persistence file " + persistenceFile.getAbsolutePath()); @@ -1616,9 +1598,4 @@ public class BucketCache implements BlockCache, HeapSize { float getMemoryFactor() { return memoryFactor; } - - @VisibleForTesting - public UniqueIndexMap<Integer> getDeserialiserMap() { - return deserialiserMap; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index f26c6c5..7d3a9fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -19,16 +19,12 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.concurrent.locks.ReentrantLock; @@ -36,20 +32,15 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; /** * IO engine that stores data to a file on the local file system. */ @InterfaceAudience.Private -public class FileIOEngine implements PersistentIOEngine { +public class FileIOEngine implements IOEngine { private static final Log LOG = LogFactory.getLog(FileIOEngine.class); public static final String FILE_DELIMITER = ","; - private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""}); - private final String[] filePaths; private final FileChannel[] fileChannels; private final RandomAccessFile[] rafs; @@ -57,58 +48,17 @@ public class FileIOEngine implements PersistentIOEngine { private final long sizePerFile; private final long capacity; - private final String algorithmName; - private boolean oldVersion; private FileReadAccessor readAccessor = new FileReadAccessor(); private FileWriteAccessor writeAccessor = new FileWriteAccessor(); - public FileIOEngine(String algorithmName, String persistentPath, - long capacity, String... filePaths) throws IOException { + public FileIOEngine(long capacity, String... filePaths) throws IOException { this.sizePerFile = capacity / filePaths.length; this.capacity = this.sizePerFile * filePaths.length; this.filePaths = filePaths; this.fileChannels = new FileChannel[filePaths.length]; this.rafs = new RandomAccessFile[filePaths.length]; this.channelLocks = new ReentrantLock[filePaths.length]; - this.algorithmName = algorithmName; - verifyFileIntegrity(persistentPath); - init(); - } - - /** - * Verify cache files's integrity - * @param persistentPath the backingMap persistent path - */ - @Override - public void verifyFileIntegrity(String persistentPath) { - if (persistentPath != null) { - byte[] persistentChecksum = readPersistentChecksum(persistentPath); - if (!oldVersion) { - try { - byte[] calculateChecksum = calculateChecksum(); - if (!Bytes.equals(persistentChecksum, calculateChecksum)) { - LOG.warn("The persistent checksum is " + Bytes.toString(persistentChecksum) + - ", but the calculate checksum is " + Bytes.toString(calculateChecksum)); - throw new IOException(); - } - } catch (IOException ioex) { - LOG.error("File verification failed because of ", ioex); - // delete cache files and backingMap persistent file. - deleteCacheDataFile(); - new File(persistentPath).delete(); - } catch (NoSuchAlgorithmException nsae) { - LOG.error("No such algorithm " + algorithmName, nsae); - throw new RuntimeException(nsae); - } - } - } else { - // not configure persistent path - deleteCacheDataFile(); - } - } - - private void init() throws IOException { for (int i = 0; i < filePaths.length; i++) { String filePath = filePaths[i]; try { @@ -118,15 +68,15 @@ public class FileIOEngine implements PersistentIOEngine { // The next setting length will throw exception,logging this message // is just used for the detail reason of exception, String msg = "Only " + StringUtils.byteDesc(totalSpace) - + " total space under " + filePath + ", not enough for requested " - + StringUtils.byteDesc(sizePerFile); + + " total space under " + filePath + ", not enough for requested " + + StringUtils.byteDesc(sizePerFile); LOG.warn(msg); } rafs[i].setLength(sizePerFile); fileChannels[i] = rafs[i].getChannel(); channelLocks[i] = new ReentrantLock(); LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) - + ", on the path: " + filePath); + + ", on the path:" + filePath); } catch (IOException fex) { LOG.error("Failed allocating cache on " + filePath, fex); shutdown(); @@ -317,98 +267,6 @@ public class FileIOEngine implements PersistentIOEngine { } } - /** - * Read the persistent checksum from persistent path - * @param persistentPath the backingMap persistent path - * @return the persistent checksum - */ - private byte[] readPersistentChecksum(String persistentPath) { - try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistentPath))) { - byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length]; - ois.read(PBMagic); - if (Bytes.equals(ProtobufUtil.PB_MAGIC, PBMagic)) { - int length = ois.readInt(); - byte[] persistentChecksum = new byte[length]; - ois.read(persistentChecksum); - return persistentChecksum; - } else { - // if the persistent file is not start with PB_MAGIC, it's an old version file - oldVersion = true; - } - } catch (IOException ioex) { - LOG.warn("Failed read persistent checksum, because of " + ioex); - return null; - } - return null; - } - - @Override - public void deleteCacheDataFile() { - if (filePaths == null) { - return; - } - for (String file : filePaths) { - new File(file).delete(); - } - } - - @Override - public byte[] calculateChecksum() - throws IOException, NoSuchAlgorithmException { - if (filePaths == null) { - return null; - } - StringBuilder sb = new StringBuilder(); - for (String filePath : filePaths){ - File file = new File(filePath); - if (file.exists()){ - sb.append(filePath); - sb.append(getFileSize(filePath)); - sb.append(file.lastModified()); - } else { - throw new IOException("Cache file: " + filePath + " is not exists."); - } - } - MessageDigest messageDigest = MessageDigest.getInstance(algorithmName); - messageDigest.update(Bytes.toBytes(sb.toString())); - return messageDigest.digest(); - } - - @Override - public boolean isOldVersion() { - return oldVersion; - } - - /** - * Using Linux command du to get file's real size - * @param filePath the file - * @return file's real size - * @throws IOException something happened like file not exists - */ - private static long getFileSize(String filePath) throws IOException { - DU.setExecCommand(filePath); - DU.execute(); - return Long.parseLong(DU.getOutput().split("\t")[0]); - } - - private static class DuFileCommand extends Shell.ShellCommandExecutor { - private String[] execCommand; - - DuFileCommand(String[] execString) { - super(execString); - execCommand = execString; - } - - void setExecCommand(String filePath) { - this.execCommand[1] = filePath; - } - - @Override - public String[] getExecString() { - return this.execCommand; - } - } - private static interface FileAccessor { int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java deleted file mode 100644 index 556f5c5..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.hfile.bucket; - -import java.io.IOException; -import java.security.NoSuchAlgorithmException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * A class implementing PersistentIOEngine interface supports persistent and file integrity verify - * for {@link BucketCache} - */ -@InterfaceAudience.Private -public interface PersistentIOEngine extends IOEngine { - - /** - * Delete bucketcache files - */ - void deleteCacheDataFile(); - - /** - * Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5 - * @return the checksum which is convert to HexString - * @throws IOException something happened like file not exists - * @throws NoSuchAlgorithmException no such algorithm - */ - byte[] calculateChecksum() - throws IOException, NoSuchAlgorithmException; - - /** - * Whether the persistent file support verify file integrity, old version file - * does not support verification, it's for back compatibility - * @return true if the persistent file does not support verify file integrity - */ - boolean isOldVersion(); - - /** - * Verify cache files's integrity - * @param persistentPath the backingMap persistent path - */ - void verifyFileIntegrity(String persistentPath); -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index d85aec9..6e677d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -68,7 +68,7 @@ public class TestFileIOEngine { @Before public void setUp() throws IOException { - fileIOEngine = new FileIOEngine("MD5", null, TOTAL_CAPACITY, FILE_PATHS); + fileIOEngine = new FileIOEngine(TOTAL_CAPACITY, FILE_PATHS); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java deleted file mode 100644 index c54315f..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.hfile.bucket; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.OutputStreamWriter; -import java.util.Arrays; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; -import org.apache.hadoop.hbase.io.hfile.Cacheable; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -/** - * Basic test for check file's integrity before start BucketCache in fileIOEngine - */ -@RunWith(Parameterized.class) -@Category(SmallTests.class) -public class TestVerifyBucketCacheFile { - @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") - public static Iterable<Object[]> data() { - return Arrays.asList(new Object[][] { { 8192, null }, { 16 * 1024, - new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, - 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, - 128 * 1024 + 1024 } } }); - } - - @Parameterized.Parameter(0) - public int constructedBlockSize; - - @Parameterized.Parameter(1) - public int[] constructedBlockSizes; - - final long capacitySize = 32 * 1024 * 1024; - final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; - final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; - - /** - * Test cache file or persistence file does not exist whether BucketCache starts normally - * (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache - * to file. Restart BucketCache and it can restore cache from file. - * (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't - * restore cache from file, the cache file and persistence file would be deleted before - * BucketCache start normally. - * (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't - * restore cache from file, the cache file and persistence file would be deleted before - * BucketCache start normally. - * @throws Exception the exception - */ - @Test - public void testRetrieveFromFile() throws Exception { - HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - Path testDir = TEST_UTIL.getDataTestDir(); - TEST_UTIL.getTestFileSystem().mkdirs(testDir); - - BucketCache bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - long usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize == 0); - CacheTestUtils.HFileBlockPair[] blocks = - CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); - // Add blocks - for (CacheTestUtils.HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); - } - usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize != 0); - // 1.persist cache to file - bucketCache.shutdown(); - // restore cache from file - bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); - // persist cache to file - bucketCache.shutdown(); - - // 2.delete bucket cache file - File cacheFile = new File(testDir + "/bucket.cache"); - assertTrue(cacheFile.delete()); - // can't restore cache from file - bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - // Add blocks - for (CacheTestUtils.HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); - } - usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize != 0); - // persist cache to file - bucketCache.shutdown(); - - // 3.delete backingMap persistence file - File mapFile = new File(testDir + "/bucket.persistence"); - assertTrue(mapFile.delete()); - // can't restore cache from file - bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - - TEST_UTIL.cleanupTestDir(); - } - - /** - * Test whether BucketCache is started normally after modifying the cache file. - * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. - * Restart BucketCache after modify cache file's data, and it can't restore cache from file, - * the cache file and persistence file would be deleted before BucketCache start normally. - * @throws Exception the exception - */ - @Test - public void testModifiedBucketCacheFileData() throws Exception { - HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - Path testDir = TEST_UTIL.getDataTestDir(); - TEST_UTIL.getTestFileSystem().mkdirs(testDir); - - BucketCache bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - long usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize == 0); - - CacheTestUtils.HFileBlockPair[] blocks = - CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); - // Add blocks - for (CacheTestUtils.HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); - } - usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize != 0); - // persist cache to file - bucketCache.shutdown(); - - // modified bucket cache file - String file = testDir + "/bucket.cache"; - try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(file, true)))) { - out.write("test bucket cache"); - } - // can't restore cache from file - bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - - TEST_UTIL.cleanupTestDir(); - } - - /** - * Test whether BucketCache is started normally after modifying the cache file's last modified - * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist - * cache to file. Then Restart BucketCache after modify cache file's last modified time, and - * it can't restore cache from file, the cache file and persistence file would be deleted - * before BucketCache start normally. - * @throws Exception the exception - */ - @Test - public void testModifiedBucketCacheFileTime() throws Exception { - HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - Path testDir = TEST_UTIL.getDataTestDir(); - TEST_UTIL.getTestFileSystem().mkdirs(testDir); - - BucketCache bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - long usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize == 0); - - CacheTestUtils.HFileBlockPair[] blocks = - CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); - // Add blocks - for (CacheTestUtils.HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); - } - usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize != 0); - // persist cache to file - bucketCache.shutdown(); - - // modified bucket cache file LastModifiedTime - File file = new File(testDir + "/bucket.cache"); - assertTrue(file.setLastModified(System.currentTimeMillis() + 1000)); - // can't restore cache from file - bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertEquals(0, bucketCache.getAllocator().getUsedSize()); - assertEquals(0, bucketCache.backingMap.size()); - - TEST_UTIL.cleanupTestDir(); - } - - /** - * Test whether it can read the old version's persistence file, it's for backward compatibility. - * Start BucketCache and add some blocks, then persist cache to file in old way and shutdown - * BucketCache. Restart BucketCache, and it can normally restore from old version persistence - * file. - * @throws Exception the exception - */ - @Test - public void compatibilityTest() throws Exception { - HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - Path testDir = TEST_UTIL.getDataTestDir(); - TEST_UTIL.getTestFileSystem().mkdirs(testDir); - String persistencePath = testDir + "/bucket.persistence"; - BucketCache bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath); - long usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize == 0); - - CacheTestUtils.HFileBlockPair[] blocks = - CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); - // Add blocks - for (CacheTestUtils.HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); - } - usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize != 0); - // persistence backingMap using old way - persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(), - bucketCache.backingMap, bucketCache.getDeserialiserMap()); - bucketCache.shutdown(); - - // restore cache from file which skip check checksum - bucketCache = - new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, - constructedBlockSizes, writeThreads, writerQLen, persistencePath + ".old"); - assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); - assertEquals(blocks.length, bucketCache.backingMap.size()); - } - - private void persistToFileInOldWay(String persistencePath, long cacheCapacity, - ConcurrentMap backingMap, UniqueIndexMap deserialiserMap) - throws IOException { - try(ObjectOutputStream oos = new ObjectOutputStream( - new FileOutputStream(persistencePath, false))) { - oos.writeLong(cacheCapacity); - oos.writeUTF(FileIOEngine.class.getName()); - oos.writeUTF(backingMap.getClass().getName()); - oos.writeObject(deserialiserMap); - oos.writeObject(backingMap); - } - } - - private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) - throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - } - } - - // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer - // threads will flush it to the bucket and put reference entry in backingMap. - private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, - Cacheable block) throws InterruptedException { - cache.cacheBlock(cacheKey, block); - waitUntilFlushedToBucket(cache, cacheKey); - } -}