joshelser commented on a change in pull request #3298:
URL: https://github.com/apache/hbase/pull/3298#discussion_r638244458



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
##########
@@ -0,0 +1,183 @@
+/*
+ *

Review comment:
       nit unnecessary line

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
##########
@@ -29,15 +29,25 @@
  * A store flush context carries the state required to prepare/flush/commit 
the store's cache.
  */
 @InterfaceAudience.Private
-interface StoreFlushContext {
+public abstract class StoreFlushContext {
+
+  protected HStore store;
+  protected long cacheFlushSeqNum;
+  protected FlushLifeCycleTracker tracker;
+
+  public StoreFlushContext(HStore store, Long cacheFlushSeqNum, 
FlushLifeCycleTracker tracker){

Review comment:
       It looks like it would take some work to do it, but can we make this be 
a `Store` instead of `HStore`?
   
   It looks like most of the methods called by DefaultStoreFileContext are 
actually defined on HStore right now.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
##########
@@ -2360,149 +2364,15 @@ public void upsert(Iterable<Cell> cells, long 
readpoint, MemStoreSizing memstore
     }
   }
 
-  public StoreFlushContext createFlushContext(long cacheFlushId, 
FlushLifeCycleTracker tracker) {
-    return new StoreFlusherImpl(cacheFlushId, tracker);
-  }
-
-  private final class StoreFlusherImpl implements StoreFlushContext {
-
-    private final FlushLifeCycleTracker tracker;
-    private final long cacheFlushSeqNum;
-    private MemStoreSnapshot snapshot;
-    private List<Path> tempFiles;
-    private List<Path> committedFiles;
-    private long cacheFlushCount;
-    private long cacheFlushSize;
-    private long outputFileSize;
-
-    private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker 
tracker) {
-      this.cacheFlushSeqNum = cacheFlushSeqNum;
-      this.tracker = tracker;
-    }
-
-    /**
-     * This is not thread safe. The caller should have a lock on the region or 
the store.
-     * If necessary, the lock can be added with the patch provided in 
HBASE-10087
-     */
-    @Override
-    public MemStoreSize prepare() {
-      // passing the current sequence number of the wal - to allow bookkeeping 
in the memstore
-      this.snapshot = memstore.snapshot();
-      this.cacheFlushCount = snapshot.getCellsCount();
-      this.cacheFlushSize = snapshot.getDataSize();
-      committedFiles = new ArrayList<>(1);
-      return snapshot.getMemStoreSize();
-    }
-
-    @Override
-    public void flushCache(MonitoredTask status) throws IOException {
-      RegionServerServices rsService = region.getRegionServerServices();
-      ThroughputController throughputController =
-          rsService == null ? null : rsService.getFlushThroughputController();
-      tempFiles =
-          HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, 
throughputController, tracker);
-    }
-
-    @Override
-    public boolean commit(MonitoredTask status) throws IOException {
-      if (CollectionUtils.isEmpty(this.tempFiles)) {
-        return false;
-      }
-      List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
-      for (Path storeFilePath : tempFiles) {
-        try {
-          HStoreFile sf = HStore.this.commitFile(storeFilePath, 
cacheFlushSeqNum, status);
-          outputFileSize += sf.getReader().length();
-          storeFiles.add(sf);
-        } catch (IOException ex) {
-          LOG.error("Failed to commit store file {}", storeFilePath, ex);
-          // Try to delete the files we have committed before.
-          for (HStoreFile sf : storeFiles) {
-            Path pathToDelete = sf.getPath();
-            try {
-              sf.deleteStoreFile();
-            } catch (IOException deleteEx) {
-              LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we 
committed, "
-                  + "halting {}", pathToDelete, ex);
-              Runtime.getRuntime().halt(1);
-            }
-          }
-          throw new IOException("Failed to commit the flush", ex);
-        }
-      }
-
-      for (HStoreFile sf : storeFiles) {
-        if (HStore.this.getCoprocessorHost() != null) {
-          HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
-        }
-        committedFiles.add(sf.getPath());
-      }
-
-      HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
-      HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
-      HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);
-
-      // Add new file to store files.  Clear snapshot too while we have the 
Store write lock.
-      return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
-    }
-
-    @Override
-    public long getOutputFileSize() {
-      return outputFileSize;
-    }
-
-    @Override
-    public List<Path> getCommittedFiles() {
-      return committedFiles;
-    }
-
-    /**
-     * Similar to commit, but called in secondary region replicas for 
replaying the
-     * flush cache from primary region. Adds the new files to the store, and 
drops the
-     * snapshot depending on dropMemstoreSnapshot argument.
-     * @param fileNames names of the flushed files
-     * @param dropMemstoreSnapshot whether to drop the prepared memstore 
snapshot
-     */
-    @Override
-    public void replayFlush(List<String> fileNames, boolean 
dropMemstoreSnapshot)
-        throws IOException {
-      List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
-      for (String file : fileNames) {
-        // open the file as a store file (hfile link, etc)
-        StoreFileInfo storeFileInfo =
-          getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
-        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
-        storeFiles.add(storeFile);
-        HStore.this.storeSize.addAndGet(storeFile.getReader().length());
-        HStore.this.totalUncompressedBytes
-            .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
-        if (LOG.isInfoEnabled()) {
-          LOG.info(this + " added " + storeFile + ", entries=" + 
storeFile.getReader().getEntries() +
-              ", sequenceid=" + storeFile.getReader().getSequenceID() + ", 
filesize="
-              + 
TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
-        }
-      }
-
-      long snapshotId = -1; // -1 means do not drop
-      if (dropMemstoreSnapshot && snapshot != null) {
-        snapshotId = snapshot.getId();
-        snapshot.close();
-      }
-      HStore.this.updateStorefiles(storeFiles, snapshotId);
-    }
-
-    /**
-     * Abort the snapshot preparation. Drops the snapshot if any.
-     */
-    @Override
-    public void abort() throws IOException {
-      if (snapshot != null) {
-        //We need to close the snapshot when aborting, otherwise, the segment 
scanner
-        //won't be closed. If we are using MSLAB, the chunk referenced by 
those scanners
-        //can't be released, thus memory leak
-        snapshot.close();
-        HStore.this.updateStorefiles(Collections.emptyList(), 
snapshot.getId());
-      }
+  public StoreFlushContext createFlushContext(long cacheFlushId, 
FlushLifeCycleTracker tracker)
+      throws IOException {
+    Class<StoreFlushContext> flushContextClass = (Class<StoreFlushContext>)
+      conf.getClass(STORE_FLUSH_CONTEXT_CLASS_NAME, 
DefaultStoreFlushContext.class);
+    try {
+      return flushContextClass.getConstructor(HStore.class, Long.class, 
FlushLifeCycleTracker.class)
+        .newInstance(this, cacheFlushId, tracker);

Review comment:
       Do you think it would be cleaner to move this into an `init()` method 
rather than using reflection on the constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to