HBASE-13082 Coarsen StoreScanner locks to RegionScanner (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b3d1f14 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b3d1f14 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b3d1f14 Branch: refs/heads/hbase-12439 Commit: 8b3d1f144408e4a7a014c5ac46418c9e91b9b0db Parents: b326042 Author: ramkrishna <ramkrishna.s.vasude...@gmail.com> Authored: Fri Dec 4 10:20:46 2015 +0530 Committer: ramkrishna <ramkrishna.s.vasude...@gmail.com> Committed: Fri Dec 4 10:20:46 2015 +0530 ---------------------------------------------------------------------- .../regionserver/DefaultStoreFileManager.java | 58 ++- .../hadoop/hbase/regionserver/HRegion.java | 26 +- .../hadoop/hbase/regionserver/HStore.java | 213 ++++++---- .../regionserver/ReversedStoreScanner.java | 19 +- .../apache/hadoop/hbase/regionserver/Store.java | 5 + .../hadoop/hbase/regionserver/StoreFile.java | 67 +++- .../hbase/regionserver/StoreFileManager.java | 25 +- .../hbase/regionserver/StoreFileScanner.java | 3 + .../hadoop/hbase/regionserver/StoreScanner.java | 384 +++++++++--------- .../regionserver/StripeStoreFileManager.java | 82 +++- .../compactions/CompactedHFilesDischarger.java | 74 ++++ .../org/apache/hadoop/hbase/TestIOFencing.java | 11 - .../TestZooKeeperTableArchiveClient.java | 23 +- .../apache/hadoop/hbase/io/TestHeapSize.java | 2 +- .../hbase/mapreduce/TestHFileOutputFormat2.java | 16 + .../master/cleaner/TestSnapshotFromMaster.java | 8 + .../hbase/regionserver/MockStoreFile.java | 5 + .../regionserver/TestEncryptionKeyRotation.java | 59 ++- .../regionserver/TestHRegionReplayEvents.java | 3 + .../TestRegionMergeTransactionOnCluster.java | 29 +- .../hbase/regionserver/TestRegionReplicas.java | 11 +- .../hadoop/hbase/regionserver/TestStore.java | 13 + .../TestStripeStoreFileManager.java | 19 + .../TestCompactedHFilesDischarger.java | 389 +++++++++++++++++++ 24 files changed, 1211 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 9f38b9e..d38306c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -54,6 +54,13 @@ class DefaultStoreFileManager implements StoreFileManager { * is atomically replaced when its contents change. */ private volatile ImmutableList<StoreFile> storefiles = null; + /** + * List of compacted files inside this store that needs to be excluded in reads + * because further new reads will be using only the newly created files out of compaction. + * These compacted files will be deleted/cleared once all the existing readers on these + * compacted files are done. + */ + private volatile List<StoreFile> compactedfiles = null; public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf, CompactionConfiguration comConf) { @@ -75,6 +82,11 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override + public Collection<StoreFile> getCompactedfiles() { + return compactedfiles; + } + + @Override public void insertNewFiles(Collection<StoreFile> sfs) throws IOException { ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles); newFiles.addAll(sfs); @@ -89,19 +101,55 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override + public Collection<StoreFile> clearCompactedFiles() { + List<StoreFile> result = compactedfiles; + compactedfiles = new ArrayList<StoreFile>(); + return result; + } + + @Override public final int getStorefileCount() { return storefiles.size(); } @Override public void addCompactionResults( - Collection<StoreFile> compactedFiles, Collection<StoreFile> results) { + Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) { ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles); - newStoreFiles.removeAll(compactedFiles); + newStoreFiles.removeAll(newCompactedfiles); if (!results.isEmpty()) { newStoreFiles.addAll(results); } sortAndSetStoreFiles(newStoreFiles); + ArrayList<StoreFile> updatedCompactedfiles = null; + if (this.compactedfiles != null) { + updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles); + updatedCompactedfiles.addAll(newCompactedfiles); + } else { + updatedCompactedfiles = new ArrayList<StoreFile>(newCompactedfiles); + } + markCompactedAway(newCompactedfiles); + this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); + } + + // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised + // Let a background thread close the actual reader on these compacted files and also + // ensure to evict the blocks from block cache so that they are no longer in + // cache + private void markCompactedAway(Collection<StoreFile> compactedFiles) { + for (StoreFile file : compactedFiles) { + file.markCompactedAway(); + } + } + + @Override + public void removeCompactedFiles(Collection<StoreFile> removedCompactedfiles) throws IOException { + ArrayList<StoreFile> updatedCompactedfiles = null; + if (this.compactedfiles != null) { + updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles); + updatedCompactedfiles.removeAll(removedCompactedfiles); + this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); + } } @Override @@ -166,6 +214,12 @@ class DefaultStoreFileManager implements StoreFileManager { storefiles = ImmutableList.copyOf(storeFiles); } + private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) { + // Sorting may not be really needed here for the compacted files? + Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID); + return new ArrayList<StoreFile>(storefiles); + } + @Override public double getCompactionPressure() { int storefileCount = getStorefileCount(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 557edd9..484d5ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -148,6 +149,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; @@ -297,6 +299,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final Configuration conf; private final Configuration baseConf; private final int rowLockWaitDuration; + private CompactedHFilesDischarger compactedFileDischarger; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; // The internal wait duration to acquire a lock before read/update @@ -809,6 +812,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeStores(reporter, status); + // Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files + // that will no longer be used in reads. + if (this.getRegionServerServices() != null) { + ChoreService choreService = this.getRegionServerServices().getChoreService(); + if (choreService != null) { + // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to + // 2 mins so that compacted files can be archived before the TTLCleaner runs + int cleanerInterval = + conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000); + this.compactedFileDischarger = + new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this); + choreService.scheduleChore(compactedFileDischarger); + } + } this.mvcc.advanceTo(maxSeqId); if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { // Recover any edits if available. @@ -1513,6 +1530,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.metricsRegionWrapper != null) { Closeables.closeQuietly(this.metricsRegionWrapper); } + // stop the Compacted hfile discharger + if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true); + status.markComplete("Closed"); LOG.info("Closed " + this); return result; @@ -6658,6 +6678,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dstRegion.getRegionFileSystem().logFileSystemState(LOG); } + // clear the compacted files if any + for (Store s : dstRegion.getStores()) { + s.closeAndArchiveCompactedFiles(); + } if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) { throw new IOException("Merged region " + dstRegion + " still has references after the compaction, is compaction canceled?"); @@ -7527,7 +7551,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 184d44f..50b3de7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -39,7 +39,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -90,7 +93,7 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -138,6 +141,8 @@ public class HStore implements Store { static int closeCheckInterval = 0; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; + private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null; + private CompletionService<StoreFile> completionService = null; /** * RWLock for store operations. @@ -181,7 +186,6 @@ public class HStore implements Store { private long blockingFileCount; private int compactionCheckMultiplier; - protected Encryption.Context cryptoContext = Encryption.Context.NONE; private volatile long flushedCellsCount = 0; @@ -272,7 +276,10 @@ public class HStore implements Store { "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); } - + compactionCleanerthreadPoolExecutor = getThreadPoolExecutor( + conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10)); + completionService = + new ExecutorCompletionService<StoreFile>(compactionCleanerthreadPoolExecutor); // Crypto context for new store files String cipherName = family.getEncryptionType(); if (cipherName != null) { @@ -551,14 +558,15 @@ public class HStore implements Store { try { Future<StoreFile> future = completionService.take(); StoreFile storeFile = future.get(); - long length = storeFile.getReader().length(); - this.storeSize += length; - this.totalUncompressedBytes += - storeFile.getReader().getTotalUncompressedBytes(); - if (LOG.isDebugEnabled()) { - LOG.debug("loaded " + storeFile.toStringDetailed()); + if (storeFile != null) { + long length = storeFile.getReader().length(); + this.storeSize += length; + this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes(); + if (LOG.isDebugEnabled()) { + LOG.debug("loaded " + storeFile.toStringDetailed()); + } + results.add(storeFile); } - results.add(storeFile); } catch (InterruptedException e) { if (ioe == null) ioe = new InterruptedIOException(e.getMessage()); } catch (ExecutionException e) { @@ -656,8 +664,7 @@ public class HStore implements Store { region.getMVCC().advanceTo(this.getMaxSequenceId()); } - // notify scanners, close file readers, and recompute store size - completeCompaction(toBeRemovedStoreFiles, false); + completeCompaction(toBeRemovedStoreFiles); } private StoreFile createStoreFileAndReader(final Path p) throws IOException { @@ -834,7 +841,6 @@ public class HStore implements Store { // the lock. this.lock.writeLock().unlock(); } - notifyChangedReadersObservers(); LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); if (LOG.isTraceEnabled()) { String traceMessage = "BULK LOAD time,size,store size,store files [" @@ -850,7 +856,10 @@ public class HStore implements Store { try { // Clear so metrics doesn't find them. ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles(); - + Collection<StoreFile> compactedfiles = + storeEngine.getStoreFileManager().clearCompactedFiles(); + // clear the compacted files + removeCompactedFiles(compactedfiles); if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. ThreadPoolExecutor storeFileCloserThreadPool = this.region @@ -865,7 +874,7 @@ public class HStore implements Store { @Override public Void call() throws IOException { boolean evictOnClose = - cacheConf != null? cacheConf.shouldEvictOnClose(): true; + cacheConf != null? cacheConf.shouldEvictOnClose(): true; f.closeReader(evictOnClose); return null; } @@ -892,6 +901,9 @@ public class HStore implements Store { } if (ioe != null) throw ioe; } + if (compactionCleanerthreadPoolExecutor != null) { + compactionCleanerthreadPoolExecutor.shutdownNow(); + } LOG.info("Closed " + this); return result; } finally { @@ -1087,10 +1099,8 @@ public class HStore implements Store { // the lock. this.lock.writeLock().unlock(); } - - // Tell listeners of the change in readers. + // notify to be called here - only in case of flushes notifyChangedReadersObservers(); - if (LOG.isTraceEnabled()) { long totalSize = 0; for (StoreFile sf : sfs) { @@ -1109,7 +1119,7 @@ public class HStore implements Store { * @throws IOException */ private void notifyChangedReadersObservers() throws IOException { - for (ChangedReadersObserver o: this.changedReaderObservers) { + for (ChangedReadersObserver o : this.changedReaderObservers) { o.updateReaders(); } } @@ -1268,7 +1278,7 @@ public class HStore implements Store { compactedCellsSize += getCompactionProgress().totalCompactedSize; } // At this point the store will use new files for all new scanners. - completeCompaction(filesToCompact, true); // Archive old files & update store size. + completeCompaction(filesToCompact); // update store size. logCompactionEndMessage(cr, sfs, compactionStartTime); return sfs; @@ -1456,7 +1466,7 @@ public class HStore implements Store { LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles + " with output files : " + outputStoreFiles); this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); - this.completeCompaction(inputStoreFiles, removeFiles); + this.completeCompaction(inputStoreFiles); } } @@ -1508,7 +1518,7 @@ public class HStore implements Store { this.getCoprocessorHost().postCompact(this, sf, null); } replaceStoreFiles(filesToCompact, Lists.newArrayList(sf)); - completeCompaction(filesToCompact, true); + completeCompaction(filesToCompact); } } finally { synchronized (filesCompacting) { @@ -1770,54 +1780,7 @@ public class HStore implements Store { @VisibleForTesting protected void completeCompaction(final Collection<StoreFile> compactedFiles) throws IOException { - completeCompaction(compactedFiles, true); - } - - - /** - * <p>It works by processing a compaction that's been written to disk. - * - * <p>It is usually invoked at the end of a compaction, but might also be - * invoked at HStore startup, if the prior execution died midway through. - * - * <p>Moving the compacted TreeMap into place means: - * <pre> - * 1) Unload all replaced StoreFile, close and collect list to delete. - * 2) Compute new store size - * </pre> - * - * @param compactedFiles list of files that were compacted - */ - @VisibleForTesting - protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles) - throws IOException { - try { - // Do not delete old store files until we have sent out notification of - // change in case old files are still being accessed by outstanding scanners. - // Don't do this under writeLock; see HBASE-4485 for a possible deadlock - // scenario that could have happened if continue to hold the lock. - notifyChangedReadersObservers(); - // At this point the store will use new files for all scanners. - - // let the archive util decide if we should archive or delete the files - LOG.debug("Removing store files after compaction..."); - boolean evictOnClose = - cacheConf != null? cacheConf.shouldEvictOnClose(): true; - for (StoreFile compactedFile : compactedFiles) { - compactedFile.closeReader(evictOnClose); - } - if (removeFiles) { - this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles); - } - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.error("Failed removing compacted files in " + this + - ". Files we were trying to remove are " + compactedFiles.toString() + - "; some of them may have been already removed", e); - } - - // 4. Compute new store size + LOG.debug("Completing compaction..."); this.storeSize = 0L; this.totalUncompressedBytes = 0L; for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { @@ -2248,7 +2211,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2365,4 +2328,112 @@ public class HStore implements Store { public boolean isPrimaryReplicaStore() { return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID; } + + @Override + public void closeAndArchiveCompactedFiles() throws IOException { + lock.readLock().lock(); + Collection<StoreFile> copyCompactedfiles = null; + try { + Collection<StoreFile> compactedfiles = + this.getStoreEngine().getStoreFileManager().getCompactedfiles(); + if (compactedfiles != null && compactedfiles.size() != 0) { + // Do a copy under read lock + copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("No compacted files to archive"); + return; + } + } + } finally { + lock.readLock().unlock(); + } + removeCompactedFiles(copyCompactedfiles); + } + + private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) { + return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "CompactedfilesArchiver-" + count++); + } + }); + } + + private void removeCompactedFiles(Collection<StoreFile> compactedfiles) throws IOException { + if (compactedfiles != null && !compactedfiles.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing the compacted store files " + compactedfiles); + } + for (final StoreFile file : compactedfiles) { + completionService.submit(new Callable<StoreFile>() { + @Override + public StoreFile call() throws IOException { + synchronized (file) { + try { + StoreFile.Reader r = file.getReader(); + if (r == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The file " + file + " was closed but still not archived."); + } + return file; + } + if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { + // Even if deleting fails we need not bother as any new scanners won't be + // able to use the compacted file as the status is already compactedAway + if (LOG.isTraceEnabled()) { + LOG.trace("Closing and archiving the file " + file.getPath()); + } + r.close(true); + // Just close and return + return file; + } + } catch (Exception e) { + LOG.error("Exception while trying to close the compacted store file " + + file.getPath().getName()); + } + } + return null; + } + }); + } + final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size()); + try { + for (final StoreFile file : compactedfiles) { + Future<StoreFile> future = completionService.take(); + StoreFile closedFile = future.get(); + if (closedFile != null) { + filesToRemove.add(closedFile); + } + } + } catch (InterruptedException ie) { + LOG.error("Interrupted exception while closing the compacted files", ie); + } catch (Exception e) { + LOG.error("Exception occured while closing the compacted files", e); + } + if (isPrimaryReplicaStore()) { + archiveAndRemoveCompactedFiles(filesToRemove); + } + + } + } + + private void archiveAndRemoveCompactedFiles(List<StoreFile> filesToArchive) throws IOException { + if (!filesToArchive.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Moving the files " + filesToArchive + " to archive"); + } + // Only if this is successful it has to be removed + this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive); + try { + lock.writeLock().lock(); + this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive); + } finally { + lock.writeLock().unlock(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index d198d7b..0e1d90f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -123,24 +123,13 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { @Override public boolean seekToPreviousRow(Cell key) throws IOException { - lock.lock(); - try { - checkReseek(); - return this.heap.seekToPreviousRow(key); - } finally { - lock.unlock(); - } - + checkReseek(); + return this.heap.seekToPreviousRow(key); } @Override public boolean backwardSeek(Cell key) throws IOException { - lock.lock(); - try { - checkReseek(); - return this.heap.backwardSeek(key); - } finally { - lock.unlock(); - } + checkReseek(); + return this.heap.backwardSeek(key); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 83a24a5..f137a8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -466,4 +466,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException; boolean isPrimaryReplicaStore(); + + /** + * Closes and archives the compacted files under this store + */ + void closeAndArchiveCompactedFiles() throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 6e5f441..2b9d101 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.SortedSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -378,6 +381,19 @@ public class StoreFile { return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); } + @VisibleForTesting + public boolean isCompactedAway() { + if (this.reader != null) { + return this.reader.isCompactedAway(); + } + return true; + } + + @VisibleForTesting + public int getRefCount() { + return this.reader.refCount.get(); + } + /** * Return the timestamp at which this bulk load file was generated. */ @@ -553,6 +569,15 @@ public class StoreFile { } /** + * Marks the status of the file as compactedAway. + */ + public void markCompactedAway() { + if (this.reader != null) { + this.reader.markCompactedAway(); + } + } + + /** * Delete this file * @throws IOException */ @@ -1137,6 +1162,12 @@ public class StoreFile { private boolean bulkLoadResult = false; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; private boolean skipResetSeqId = true; + // Counter that is incremented every time a scanner is created on the + // store file. It is decremented when the scan on the store file is + // done. + private AtomicInteger refCount = new AtomicInteger(0); + // Indicates if the file got compacted + private volatile boolean compactedAway = false; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1144,6 +1175,10 @@ public class StoreFile { bloomFilterType = BloomType.NONE; } + void markCompactedAway() { + this.compactedAway = true; + } + public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Configuration conf) throws IOException { reader = HFile.createReader(fs, path, in, size, cacheConf, conf); @@ -1195,12 +1230,36 @@ public class StoreFile { public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { + // Increment the ref count + refCount.incrementAndGet(); return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(), readPt); } /** + * Decrement the ref count associated with the reader when ever a scanner associated + * with the reader is closed + */ + void decrementRefCount() { + refCount.decrementAndGet(); + } + + /** + * @return true if the file is still used in reads + */ + public boolean isReferencedInReads() { + return refCount.get() != 0; + } + + /** + * @return true if the file is compacted + */ + public boolean isCompactedAway() { + return this.compactedAway; + } + + /** * Warning: Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. @@ -1710,7 +1769,13 @@ public class StoreFile { private static class GetFileSize implements Function<StoreFile, Long> { @Override public Long apply(StoreFile sf) { - return sf.getReader().length(); + if (sf.getReader() != null) { + return sf.getReader().length(); + } else { + // the reader may be null for the compacted files and if the archiving + // had failed. + return -1L; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 11993db..7e70547 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -53,7 +53,7 @@ public interface StoreFileManager { void insertNewFiles(Collection<StoreFile> sfs) throws IOException; /** - * Adds compaction results into the structure. + * Adds only the new compaction results into the structure. * @param compactedFiles The input files for the compaction. * @param results The resulting files for the compaction. */ @@ -61,12 +61,26 @@ public interface StoreFileManager { Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException; /** + * Remove the compacted files + * @param compactedFiles the list of compacted files + * @throws IOException + */ + void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException; + + /** * Clears all the files currently in use and returns them. * @return The files previously in use. */ ImmutableCollection<StoreFile> clearFiles(); /** + * Clears all the compacted files and returns them. This method is expected to be + * accessed single threaded. + * @return The files compacted previously. + */ + Collection<StoreFile> clearCompactedFiles(); + + /** * Gets the snapshot of the store files currently in use. Can be used for things like metrics * and checks; should not assume anything about relations between store files in the list. * @return The list of StoreFiles. @@ -74,6 +88,15 @@ public interface StoreFileManager { Collection<StoreFile> getStorefiles(); /** + * List of compacted files inside this store that needs to be excluded in reads + * because further new reads will be using only the newly created files out of compaction. + * These compacted files will be deleted/cleared once all the existing readers on these + * compacted files are done. + * @return the list of compacted files + */ + Collection<StoreFile> getCompactedfiles(); + + /** * Returns the number of files currently in use. * @return The number of files. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index fb154c0..c864733 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -248,6 +248,9 @@ public class StoreFileScanner implements KeyValueScanner { public void close() { cur = null; this.hfs.close(); + if (this.reader != null) { + this.reader.decrementRefCount(); + } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index a4e066c..44f07f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -125,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // A flag whether use pread for scan private boolean scanUsePread = false; - protected ReentrantLock lock = new ReentrantLock(); + // Indicates whether there was flush during the course of the scan + protected volatile boolean flushed = false; protected final long readPt; @@ -403,15 +403,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public Cell peek() { - lock.lock(); - try { + checkResetHeap(); if (this.heap == null) { return this.lastTop; } return this.heap.peek(); - } finally { - lock.unlock(); - } } @Override @@ -425,46 +421,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner close(true); } - private void close(boolean withHeapClose){ - lock.lock(); - try { - if (this.closing) { - return; + private void close(boolean withHeapClose) { + if (this.closing) { + return; + } + if (withHeapClose) this.closing = true; + // Under test, we dont have a this.store + if (this.store != null) this.store.deleteChangedReaderObserver(this); + if (withHeapClose) { + for (KeyValueHeap h : this.heapsForDelayedClose) { + h.close(); } - if (withHeapClose) this.closing = true; - // Under test, we dont have a this.store - if (this.store != null) this.store.deleteChangedReaderObserver(this); - if (withHeapClose) { - for (KeyValueHeap h : this.heapsForDelayedClose) { - h.close(); - } - this.heapsForDelayedClose.clear(); - if (this.heap != null) { - this.heap.close(); - this.heap = null; // CLOSED! - } - } else { - if (this.heap != null) { - this.heapsForDelayedClose.add(this.heap); - this.heap = null; - } + this.heapsForDelayedClose.clear(); + if (this.heap != null) { + this.heap.close(); + this.heap = null; // CLOSED! + } + } else { + if (this.heap != null) { + this.heapsForDelayedClose.add(this.heap); + this.heap = null; } - this.lastTop = null; // If both are null, we are closed. - } finally { - lock.unlock(); } + this.lastTop = null; // If both are null, we are closed. } @Override public boolean seek(Cell key) throws IOException { - lock.lock(); - try { + checkResetHeap(); // reset matcher state, in case that underlying store changed checkReseek(); return this.heap.seek(key); - } finally { - lock.unlock(); - } } @Override @@ -480,173 +467,168 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ @Override public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { - lock.lock(); - try { - if (scannerContext == null) { - throw new IllegalArgumentException("Scanner context cannot be null"); - } - if (checkReseek()) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - } + if (scannerContext == null) { + throw new IllegalArgumentException("Scanner context cannot be null"); + } + checkResetHeap(); + if (checkReseek()) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + } - // if the heap was left null, then the scanners had previously run out anyways, close and - // return. - if (this.heap == null) { - // By this time partial close should happened because already heap is null - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + // if the heap was left null, then the scanners had previously run out anyways, close and + // return. + if (this.heap == null) { + // By this time partial close should happened because already heap is null + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } - Cell cell = this.heap.peek(); - if (cell == null) { - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + Cell cell = this.heap.peek(); + if (cell == null) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } - // only call setRow if the row changes; avoids confusing the query matcher - // if scanning intra-row + // only call setRow if the row changes; avoids confusing the query matcher + // if scanning intra-row - // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing - // rows. Else it is possible we are still traversing the same row so we must perform the row - // comparison. - if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null || - !CellUtil.matchingRow(cell, matcher.curCell)) { - this.countPerRow = 0; - matcher.setToNewRow(cell); - } + // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing + // rows. Else it is possible we are still traversing the same row so we must perform the row + // comparison. + if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null + || !CellUtil.matchingRow(cell, matcher.curCell)) { + this.countPerRow = 0; + matcher.setToNewRow(cell); + } - // Clear progress away unless invoker has indicated it should be kept. - if (!scannerContext.getKeepProgress()) scannerContext.clearProgress(); + // Clear progress away unless invoker has indicated it should be kept. + if (!scannerContext.getKeepProgress()) scannerContext.clearProgress(); - // Only do a sanity-check if store and comparator are available. - CellComparator comparator = store != null ? store.getComparator() : null; + // Only do a sanity-check if store and comparator are available. + CellComparator comparator = store != null ? store.getComparator() : null; - int count = 0; - long totalBytesRead = 0; + int count = 0; + long totalBytesRead = 0; - LOOP: do { - // Update and check the time limit based on the configured value of cellsPerTimeoutCheck - if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { - scannerContext.updateTimeProgress(); - if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { - return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); - } + LOOP: do { + // Update and check the time limit based on the configured value of cellsPerTimeoutCheck + if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { + scannerContext.updateTimeProgress(); + if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } + } - if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. - checkScanOrder(prevCell, cell, comparator); - prevCell = cell; + if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. + checkScanOrder(prevCell, cell, comparator); + prevCell = cell; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); - qcode = optimize(qcode, cell); - switch(qcode) { - case INCLUDE: - case INCLUDE_AND_SEEK_NEXT_ROW: - case INCLUDE_AND_SEEK_NEXT_COL: + ScanQueryMatcher.MatchCode qcode = matcher.match(cell); + qcode = optimize(qcode, cell); + switch (qcode) { + case INCLUDE: + case INCLUDE_AND_SEEK_NEXT_ROW: + case INCLUDE_AND_SEEK_NEXT_COL: - Filter f = matcher.getFilter(); - if (f != null) { - cell = f.transformCell(cell); - } + Filter f = matcher.getFilter(); + if (f != null) { + cell = f.transformCell(cell); + } - this.countPerRow++; - if (storeLimit > -1 && - this.countPerRow > (storeLimit + storeOffset)) { - // do what SEEK_NEXT_ROW does. - if (!matcher.moreRowsMayExistAfter(cell)) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - seekToNextRow(cell); - break LOOP; + this.countPerRow++; + if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { + // do what SEEK_NEXT_ROW does. + if (!matcher.moreRowsMayExistAfter(cell)) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } + seekToNextRow(cell); + break LOOP; + } - // add to results only if we have skipped #storeOffset kvs - // also update metric accordingly - if (this.countPerRow > storeOffset) { - outResult.add(cell); + // add to results only if we have skipped #storeOffset kvs + // also update metric accordingly + if (this.countPerRow > storeOffset) { + outResult.add(cell); - // Update local tracking information - count++; - totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); + // Update local tracking information + count++; + totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); - // Update the progress of the scanner context - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); - scannerContext.incrementBatchProgress(1); + // Update the progress of the scanner context + scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + scannerContext.incrementBatchProgress(1); - if (totalBytesRead > maxRowSize) { - throw new RowTooBigException("Max row size allowed: " + maxRowSize - + ", but the row is bigger than that."); - } + if (totalBytesRead > maxRowSize) { + throw new RowTooBigException( + "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); } + } - if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (!matcher.moreRowsMayExistAfter(cell)) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - seekToNextRow(cell); - } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - seekAsDirection(matcher.getKeyForNextColumn(cell)); - } else { - this.heap.next(); + if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + if (!matcher.moreRowsMayExistAfter(cell)) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } + seekToNextRow(cell); + } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + seekAsDirection(matcher.getKeyForNextColumn(cell)); + } else { + this.heap.next(); + } - if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - continue; + if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + continue; - case DONE: - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + case DONE: + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - case DONE_SCAN: - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + case DONE_SCAN: + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - case SEEK_NEXT_ROW: - // This is just a relatively simple end of scan fix, to short-cut end - // us if there is an endKey in the scan. - if (!matcher.moreRowsMayExistAfter(cell)) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + case SEEK_NEXT_ROW: + // This is just a relatively simple end of scan fix, to short-cut end + // us if there is an endKey in the scan. + if (!matcher.moreRowsMayExistAfter(cell)) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } - seekToNextRow(cell); - break; + seekToNextRow(cell); + break; - case SEEK_NEXT_COL: - seekAsDirection(matcher.getKeyForNextColumn(cell)); - break; + case SEEK_NEXT_COL: + seekAsDirection(matcher.getKeyForNextColumn(cell)); + break; - case SKIP: - this.heap.next(); - break; - - case SEEK_NEXT_USING_HINT: - Cell nextKV = matcher.getNextKeyHint(cell); - if (nextKV != null) { - seekAsDirection(nextKV); - } else { - heap.next(); - } - break; + case SKIP: + this.heap.next(); + break; - default: - throw new RuntimeException("UNEXPECTED"); + case SEEK_NEXT_USING_HINT: + Cell nextKV = matcher.getNextKeyHint(cell); + if (nextKV != null) { + seekAsDirection(nextKV); + } else { + heap.next(); } - } while((cell = this.heap.peek()) != null); + break; - if (count > 0) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + default: + throw new RuntimeException("UNEXPECTED"); } + } while ((cell = this.heap.peek()) != null); - // No more keys - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } finally { - lock.unlock(); + if (count > 0) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } + + // No more keys + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } /* @@ -684,30 +666,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Implementation of ChangedReadersObserver @Override public void updateReaders() throws IOException { - lock.lock(); - try { - if (this.closing) return; + flushed = true; + // Let the next() call handle re-creating and seeking + } + protected void nullifyCurrentHeap() { + if (this.closing) return; // All public synchronized API calls will call 'checkReseek' which will cause // the scanner stack to reseek if this.heap==null && this.lastTop != null. // But if two calls to updateReaders() happen without a 'next' or 'peek' then we // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders // which is NOT what we want, not to mention could cause an NPE. So we early out here. if (this.heap == null) return; - // this could be null. - this.lastTop = this.peek(); + this.lastTop = this.heap.peek(); //DebugPrint.println("SS updateReaders, topKey = " + lastTop); // close scanners to old obsolete Store files this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP - - // Let the next() call handle re-creating and seeking - } finally { - lock.unlock(); - } } /** @@ -793,18 +771,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean reseek(Cell kv) throws IOException { - lock.lock(); - try { - //Heap will not be null, if this is called from next() which. - //If called from RegionScanner.reseek(...) make sure the scanner - //stack is reset if needed. + checkResetHeap(); + // Heap will not be null, if this is called from next() which. + // If called from RegionScanner.reseek(...) make sure the scanner + // stack is reset if needed. checkReseek(); if (explicitColumnQuery && lazySeekEnabledGlobally) { return heap.requestSeek(kv, true, useRowColBloom); } return heap.reseek(kv); - } finally { - lock.unlock(); + } + + protected void checkResetHeap() { + // check the var without any lock. Suppose even if we see the old + // value here still it is ok to continue because we will not be resetting + // the heap but will continue with the referenced memstore's snapshot. For compactions + // any way we don't need the updateReaders at all to happen as we still continue with + // the older files + if (flushed) { + // If the 'flushed' is found to be true then there is a need to ensure + // that the current scanner updates the heap that it has and then proceed + // with the scan and ensure to reset the flushed inside the lock + // One thing can be sure that the same store scanner cannot be in reseek and + // next at the same time ie. within the same store scanner it is always single + // threaded + nullifyCurrentHeap(); + // reset the flag + flushed = false; } } @@ -883,17 +876,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public void shipped() throws IOException { - lock.lock(); - try { - for (KeyValueHeap h : this.heapsForDelayedClose) { - h.close();// There wont be further fetch of Cells from these scanners. Just close. - } - this.heapsForDelayedClose.clear(); - if (this.heap != null) { - this.heap.shipped(); - } - } finally { - lock.unlock(); + for (KeyValueHeap h : this.heapsForDelayedClose) { + h.close();// There wont be further fetch of Cells from these scanners. Just close. + } + this.heapsForDelayedClose.clear(); + if (this.heap != null) { + this.heap.shipped(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index bb49aba..ef2c282 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -106,6 +106,7 @@ public class StripeStoreFileManager /** Cached list of all files in the structure, to return from some calls */ public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of(); + private ImmutableList<StoreFile> allCompactedFilesCached = ImmutableList.<StoreFile>of(); } private State state = null; @@ -141,8 +142,14 @@ public class StripeStoreFileManager } @Override + public Collection<StoreFile> getCompactedfiles() { + return state.allCompactedFilesCached; + } + + @Override public void insertNewFiles(Collection<StoreFile> sfs) throws IOException { CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); + // Passing null does not cause NPE?? cmc.mergeResults(null, sfs); debugDumpState("Added new files"); } @@ -157,6 +164,13 @@ public class StripeStoreFileManager } @Override + public ImmutableCollection<StoreFile> clearCompactedFiles() { + ImmutableCollection<StoreFile> result = state.allCompactedFilesCached; + this.state = new State(); + return result; + } + + @Override public int getStorefileCount() { return state.allFilesCached.size(); } @@ -306,9 +320,31 @@ public class StripeStoreFileManager // copies and apply the result at the end. CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); cmc.mergeResults(compactedFiles, results); + markCompactedAway(compactedFiles); debugDumpState("Merged compaction results"); } + // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised + // Let a background thread close the actual reader on these compacted files and also + // ensure to evict the blocks from block cache so that they are no longer in + // cache + private void markCompactedAway(Collection<StoreFile> compactedFiles) { + for (StoreFile file : compactedFiles) { + file.markCompactedAway(); + } + } + + @Override + public void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException { + // See class comment for the assumptions we make here. + LOG.debug("Attempting to delete compaction results: " + compactedFiles.size()); + // In order to be able to fail in the middle of the operation, we'll operate on lazy + // copies and apply the result at the end. + CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); + cmc.deleteResults(compactedFiles); + debugDumpState("Deleted compaction results"); + } + @Override public int getStoreCompactionPriority() { // If there's only L0, do what the default store does. @@ -684,7 +720,7 @@ public class StripeStoreFileManager this.isFlush = isFlush; } - public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results) + private void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException { assert this.compactedFiles == null && this.results == null; this.compactedFiles = compactedFiles; @@ -696,12 +732,20 @@ public class StripeStoreFileManager processNewCandidateStripes(newStripes); } // Create new state and update parent. - State state = createNewState(); + State state = createNewState(false); StripeStoreFileManager.this.state = state; updateMetadataMaps(); } - private State createNewState() { + private void deleteResults(Collection<StoreFile> compactedFiles) throws IOException { + this.compactedFiles = compactedFiles; + // Create new state and update parent. + State state = createNewState(true); + StripeStoreFileManager.this.state = state; + updateMetadataMaps(); + } + + private State createNewState(boolean delCompactedFiles) { State oldState = StripeStoreFileManager.this.state; // Stripe count should be the same unless the end rows changed. assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null; @@ -717,9 +761,21 @@ public class StripeStoreFileManager } List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached); - if (!isFlush) newAllFiles.removeAll(compactedFiles); - newAllFiles.addAll(results); + List<StoreFile> newAllCompactedFiles = + new ArrayList<StoreFile>(oldState.allCompactedFilesCached); + if (!isFlush) { + newAllFiles.removeAll(compactedFiles); + if (delCompactedFiles) { + newAllCompactedFiles.removeAll(compactedFiles); + } else { + newAllCompactedFiles.addAll(compactedFiles); + } + } + if (results != null) { + newAllFiles.addAll(results); + } newState.allFilesCached = ImmutableList.copyOf(newAllFiles); + newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles); return newState; } @@ -970,14 +1026,16 @@ public class StripeStoreFileManager // Order by seqnum is reversed. for (int i = 1; i < stripe.size(); ++i) { StoreFile sf = stripe.get(i); - long fileTs = sf.getReader().getMaxTimestamp(); - if (fileTs < maxTs && !filesCompacting.contains(sf)) { - LOG.info("Found an expired store file: " + sf.getPath() - + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); - if (expiredStoreFiles == null) { - expiredStoreFiles = new ArrayList<StoreFile>(); + synchronized (sf) { + long fileTs = sf.getReader().getMaxTimestamp(); + if (fileTs < maxTs && !filesCompacting.contains(sf)) { + LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is " + + fileTs + ", which is below " + maxTs); + if (expiredStoreFiles == null) { + expiredStoreFiles = new ArrayList<StoreFile>(); + } + expiredStoreFiles.add(sf); } - expiredStoreFiles.add(sf); } } return expiredStoreFiles; http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java new file mode 100644 index 0000000..4cf120d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; + +/** + * A chore service that periodically cleans up the compacted files when there are no active readers + * using those compacted files and also helps in clearing the block cache with these compacted + * file entries + */ +@InterfaceAudience.Private +public class CompactedHFilesDischarger extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class); + private Region region; + + /** + * @param period the period of time to sleep between each run + * @param stopper the stopper + * @param region the store to identify the family name + */ + public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) { + // Need to add the config classes + super("CompactedHFilesCleaner", stopper, period); + this.region = region; + } + + @Override + public void chore() { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo()); + } + for (Store store : region.getStores()) { + try { + store.closeAndArchiveCompactedFiles(); + if (LOG.isTraceEnabled()) { + LOG.trace("Completed archiving the compacted files for the region " + + this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); + } + } catch (Exception e) { + LOG.error( + "Exception while trying to close and archive the comapcted store files of the store " + + store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(), + e); + } + } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 304638a..94a63d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -198,17 +198,6 @@ public class TestIOFencing { } @Override - protected void completeCompaction(final Collection<StoreFile> compactedFiles, - boolean removeFiles) throws IOException { - try { - r.compactionsWaiting.countDown(); - r.compactionsBlocked.await(); - } catch (InterruptedException ex) { - throw new IOException(ex); - } - super.completeCompaction(compactedFiles, removeFiles); - } - @Override protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException { try { r.compactionsWaiting.countDown(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index a28112d..55e43de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -170,10 +172,11 @@ public class TestZooKeeperTableArchiveClient { // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); - Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); - + HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + final CompactedHFilesDischarger compactionCleaner = + new CompactedHFilesDischarger(100, stop, region); loadFlushAndCompact(region, TEST_FAM); - + compactionCleaner.chore(); // get the current hfiles in the archive directory List<Path> files = getAllFiles(fs, archiveDir); if (files == null) { @@ -217,18 +220,22 @@ public class TestZooKeeperTableArchiveClient { HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); - // create the region HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); - Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); + final CompactedHFilesDischarger compactionCleaner = + new CompactedHFilesDischarger(100, stop, region); loadFlushAndCompact(region, TEST_FAM); - + compactionCleaner.chore(); // create the another table that we don't archive hcd = new HColumnDescriptor(TEST_FAM); - Region otherRegion = UTIL.createTestRegion(otherTable, hcd); + HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); + final CompactedHFilesDischarger compactionCleaner1 = + new CompactedHFilesDischarger(100, stop, otherRegion); loadFlushAndCompact(otherRegion, TEST_FAM); - + compactionCleaner1.chore(); // get the current hfiles in the archive directory + // Should be archived List<Path> files = getAllFiles(fs, archiveDir); if (files == null) { FSUtils.logFileSystemState(fs, archiveDir, LOG); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index f6ade32..4f30960 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -76,7 +76,7 @@ public class TestHeapSize { LOG.info("name=" + b.getName()); LOG.info("specname=" + b.getSpecName()); LOG.info("specvendor=" + b.getSpecVendor()); - LOG.info("vmname=" + b.getVmName()); + LOG.info("vmname=" + b.getVmName()); LOG.info("vmversion=" + b.getVmVersion()); LOG.info("vmvendor=" + b.getVmVendor()); Map<String, String> p = b.getSystemProperties(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 9ff88f0..337eeac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; @@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -95,6 +97,8 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; +import com.google.common.collect.Lists; + /** * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}. * Sets up and runs a mapreduce job that writes hfile output. @@ -1002,6 +1006,12 @@ public class TestHFileOutputFormat2 { quickPoll(new Callable<Boolean>() { @Override public Boolean call() throws Exception { + List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + for (HRegion region : regions) { + for (Store store : region.getStores()) { + store.closeAndArchiveCompactedFiles(); + } + } return fs.listStatus(storePath).length == 1; } }, 5000); @@ -1015,6 +1025,12 @@ public class TestHFileOutputFormat2 { quickPoll(new Callable<Boolean>() { @Override public Boolean call() throws Exception { + List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME); + for (HRegion region : regions) { + for (Store store : region.getStores()) { + store.closeAndArchiveCompactedFiles(); + } + } return fs.listStatus(storePath).length == 1; } }, 5000); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java index 20b0642..60c5473 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java @@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneReq import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; @@ -121,6 +124,7 @@ public class TestSnapshotFromMaster { conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod); conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName()); + conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000); } @@ -320,6 +324,10 @@ public class TestSnapshotFromMaster { region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it. region.compactStores(); // min is 2 so will compact and archive } + for (HRegion region : regions) { + CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region); + cleaner.chore(); + } LOG.info("After compaction File-System state"); FSUtils.logFileSystemState(fs, rootDir, LOG); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index f99226f..3614846 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -79,6 +79,11 @@ public class MockStoreFile extends StoreFile { } @Override + public boolean isCompactedAway() { + return false; + } + + @Override public byte[] getMetadataValue(byte[] key) { return this.metadata.get(key); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java index 6c66c6d..82be1db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.*; +import java.io.IOException; import java.security.Key; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import javax.crypto.spec.SecretKeySpec; @@ -123,13 +125,14 @@ public class TestEncryptionKeyRotation { // And major compact TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName()); + final List<Path> updatePaths = findCompactedStorefilePaths(htd.getTableName()); TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() { @Override public boolean evaluate() throws Exception { // When compaction has finished, all of the original files will be // gone boolean found = false; - for (Path path: initialPaths) { + for (Path path: updatePaths) { found = TEST_UTIL.getTestFileSystem().exists(path); if (found) { LOG.info("Found " + path); @@ -141,14 +144,20 @@ public class TestEncryptionKeyRotation { }); // Verify we have store file(s) with only the new key + Thread.sleep(1000); + waitForCompaction(htd.getTableName()); List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName()); assertTrue(pathsAfterCompaction.size() > 0); for (Path path: pathsAfterCompaction) { - assertFalse("Store file " + path + " retains initial key", - Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path))); assertTrue("Store file " + path + " has incorrect key", Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path))); } + List<Path> compactedPaths = findCompactedStorefilePaths(htd.getTableName()); + assertTrue(compactedPaths.size() > 0); + for (Path path: compactedPaths) { + assertTrue("Store file " + path + " retains initial key", + Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path))); + } } @Test @@ -194,6 +203,33 @@ public class TestEncryptionKeyRotation { } } + private static void waitForCompaction(TableName tableName) + throws IOException, InterruptedException { + boolean compacted = false; + for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName) + .getOnlineRegions(tableName)) { + for (Store store : region.getStores()) { + compacted = false; + while (!compacted) { + if (store.getStorefiles() != null) { + while (store.getStorefilesCount() != 1) { + Thread.sleep(100); + } + for (StoreFile storefile : store.getStorefiles()) { + if (!storefile.isCompactedAway()) { + compacted = true; + break; + } + Thread.sleep(100); + } + } else { + break; + } + } + } + } + } + private static List<Path> findStorefilePaths(TableName tableName) throws Exception { List<Path> paths = new ArrayList<Path>(); for (Region region: @@ -207,6 +243,23 @@ public class TestEncryptionKeyRotation { return paths; } + private static List<Path> findCompactedStorefilePaths(TableName tableName) throws Exception { + List<Path> paths = new ArrayList<Path>(); + for (Region region: + TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) { + for (Store store : region.getStores()) { + Collection<StoreFile> compactedfiles = + ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); + if (compactedfiles != null) { + for (StoreFile storefile : compactedfiles) { + paths.add(storefile.getPath()); + } + } + } + } + return paths; + } + private void createTableAndFlush(HTableDescriptor htd) throws Exception { HColumnDescriptor hcd = htd.getFamilies().iterator().next(); // Create the test table http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 0c2e01c..c59d6f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -1369,6 +1370,8 @@ public class TestHRegionReplayEvents { // Test case 3: compact primary files primaryRegion.compactStores(); + CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion); + cleaner.chore(); secondaryRegion.refreshStoreFiles(); assertPathListsEqual(primaryRegion.getStoreFileList(families), secondaryRegion.getStoreFileList(families)); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index 1c99fe3..e0c1453 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -212,7 +214,7 @@ public class TestRegionMergeTransactionOnCluster { List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor .getTableRegionsAndLocations(master.getConnection(), tableName); HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst(); - HTableDescriptor tableDescritor = master.getTableDescriptors().get( + HTableDescriptor tableDescriptor = master.getTableDescriptors().get( tableName); Result mergedRegionResult = MetaTableAccessor.getRegionResult( master.getConnection(), mergedRegionInfo.getRegionName()); @@ -236,19 +238,34 @@ public class TestRegionMergeTransactionOnCluster { assertTrue(fs.exists(regionAdir)); assertTrue(fs.exists(regionBdir)); + HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies(); + HRegionFileSystem hrfs = new HRegionFileSystem( + TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo); + int count = 0; + for(HColumnDescriptor colFamily : columnFamilies) { + count += hrfs.getStoreFiles(colFamily.getName()).size(); + } admin.compactRegion(mergedRegionInfo.getRegionName()); // wait until merged region doesn't have reference file long timeout = System.currentTimeMillis() + waitTime; - HRegionFileSystem hrfs = new HRegionFileSystem( - TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo); while (System.currentTimeMillis() < timeout) { - if (!hrfs.hasReferences(tableDescritor)) { + if (!hrfs.hasReferences(tableDescriptor)) { break; } Thread.sleep(50); } - assertFalse(hrfs.hasReferences(tableDescritor)); - + int newcount = 0; + for(HColumnDescriptor colFamily : columnFamilies) { + newcount += hrfs.getStoreFiles(colFamily.getName()).size(); + } + assertTrue(newcount > count); + // clean up the merged region store files + List<HRegion> regions = + TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName()); + for (HRegion region : regions) { + CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region); + cleaner.chore(); + } // run CatalogJanitor to clean merge references in hbase:meta and archive the // files of merging regions int cleaned = admin.runCatalogScan();