joshelser commented on a change in pull request #3298: URL: https://github.com/apache/hbase/pull/3298#discussion_r638244458
########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java ########## @@ -0,0 +1,183 @@ +/* + * Review comment: nit unnecessary line ########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java ########## @@ -29,15 +29,25 @@ * A store flush context carries the state required to prepare/flush/commit the store's cache. */ @InterfaceAudience.Private -interface StoreFlushContext { +public abstract class StoreFlushContext { + + protected HStore store; + protected long cacheFlushSeqNum; + protected FlushLifeCycleTracker tracker; + + public StoreFlushContext(HStore store, Long cacheFlushSeqNum, FlushLifeCycleTracker tracker){ Review comment: It looks like it would take some work to do it, but can we make this be a `Store` instead of `HStore`? It looks like most of the methods called by DefaultStoreFileContext are actually defined on HStore right now. ########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ########## @@ -2360,149 +2364,15 @@ public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstore } } - public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { - return new StoreFlusherImpl(cacheFlushId, tracker); - } - - private final class StoreFlusherImpl implements StoreFlushContext { - - private final FlushLifeCycleTracker tracker; - private final long cacheFlushSeqNum; - private MemStoreSnapshot snapshot; - private List<Path> tempFiles; - private List<Path> committedFiles; - private long cacheFlushCount; - private long cacheFlushSize; - private long outputFileSize; - - private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { - this.cacheFlushSeqNum = cacheFlushSeqNum; - this.tracker = tracker; - } - - /** - * This is not thread safe. The caller should have a lock on the region or the store. - * If necessary, the lock can be added with the patch provided in HBASE-10087 - */ - @Override - public MemStoreSize prepare() { - // passing the current sequence number of the wal - to allow bookkeeping in the memstore - this.snapshot = memstore.snapshot(); - this.cacheFlushCount = snapshot.getCellsCount(); - this.cacheFlushSize = snapshot.getDataSize(); - committedFiles = new ArrayList<>(1); - return snapshot.getMemStoreSize(); - } - - @Override - public void flushCache(MonitoredTask status) throws IOException { - RegionServerServices rsService = region.getRegionServerServices(); - ThroughputController throughputController = - rsService == null ? null : rsService.getFlushThroughputController(); - tempFiles = - HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); - } - - @Override - public boolean commit(MonitoredTask status) throws IOException { - if (CollectionUtils.isEmpty(this.tempFiles)) { - return false; - } - List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); - for (Path storeFilePath : tempFiles) { - try { - HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); - outputFileSize += sf.getReader().length(); - storeFiles.add(sf); - } catch (IOException ex) { - LOG.error("Failed to commit store file {}", storeFilePath, ex); - // Try to delete the files we have committed before. - for (HStoreFile sf : storeFiles) { - Path pathToDelete = sf.getPath(); - try { - sf.deleteStoreFile(); - } catch (IOException deleteEx) { - LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, " - + "halting {}", pathToDelete, ex); - Runtime.getRuntime().halt(1); - } - } - throw new IOException("Failed to commit the flush", ex); - } - } - - for (HStoreFile sf : storeFiles) { - if (HStore.this.getCoprocessorHost() != null) { - HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); - } - committedFiles.add(sf.getPath()); - } - - HStore.this.flushedCellsCount.addAndGet(cacheFlushCount); - HStore.this.flushedCellsSize.addAndGet(cacheFlushSize); - HStore.this.flushedOutputFileSize.addAndGet(outputFileSize); - - // Add new file to store files. Clear snapshot too while we have the Store write lock. - return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); - } - - @Override - public long getOutputFileSize() { - return outputFileSize; - } - - @Override - public List<Path> getCommittedFiles() { - return committedFiles; - } - - /** - * Similar to commit, but called in secondary region replicas for replaying the - * flush cache from primary region. Adds the new files to the store, and drops the - * snapshot depending on dropMemstoreSnapshot argument. - * @param fileNames names of the flushed files - * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot - */ - @Override - public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) - throws IOException { - List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); - for (String file : fileNames) { - // open the file as a store file (hfile link, etc) - StoreFileInfo storeFileInfo = - getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); - HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); - storeFiles.add(storeFile); - HStore.this.storeSize.addAndGet(storeFile.getReader().length()); - HStore.this.totalUncompressedBytes - .addAndGet(storeFile.getReader().getTotalUncompressedBytes()); - if (LOG.isInfoEnabled()) { - LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() + - ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize=" - + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1)); - } - } - - long snapshotId = -1; // -1 means do not drop - if (dropMemstoreSnapshot && snapshot != null) { - snapshotId = snapshot.getId(); - snapshot.close(); - } - HStore.this.updateStorefiles(storeFiles, snapshotId); - } - - /** - * Abort the snapshot preparation. Drops the snapshot if any. - */ - @Override - public void abort() throws IOException { - if (snapshot != null) { - //We need to close the snapshot when aborting, otherwise, the segment scanner - //won't be closed. If we are using MSLAB, the chunk referenced by those scanners - //can't be released, thus memory leak - snapshot.close(); - HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); - } + public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) + throws IOException { + Class<StoreFlushContext> flushContextClass = (Class<StoreFlushContext>) + conf.getClass(STORE_FLUSH_CONTEXT_CLASS_NAME, DefaultStoreFlushContext.class); + try { + return flushContextClass.getConstructor(HStore.class, Long.class, FlushLifeCycleTracker.class) + .newInstance(this, cacheFlushId, tracker); Review comment: Do you think it would be cleaner to move this into an `init()` method rather than using reflection on the constructor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org