[ 
https://issues.apache.org/jira/browse/HBASE-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361029#comment-15361029
 ] 

Anastasia Braginsky commented on HBASE-16162:
---------------------------------------------

Hi [~anoop.hbase]!

Thank you for all your effort and the important input!
I agree with you that checking the boolean created for testing is heavy to be 
done in the common *add* path. 
Although the check happens only when (getActive().getSize() > 
inmemoryFlushSize), this check is not needed (it was there for future use).
We can remove the allowCompaction.get() from shouldFlushInMemory().

I am already confused with all the patches. May be you can put in the review 
board the final version?
For example, I see in master that 
inMemoryFlushInProgress.compareAndSet(false,true) is used in flushInMemory() 
and it should not be there... Is it just old code? Or one of the fixes?
Generally, it is important to do the inMemoryFlushInProgress CAS *only once* 
and in shouldFlushInMemory().

Anyway, I am putting here the code as I think it should be. The code is taken 
from HBASE-14921 new patch.

{code}
  // internally used method, externally visible only for tests
  // when invoked directly from tests it must be verified that the caller 
doesn't hold updatesLock,
  // otherwise there is a deadlock
  @VisibleForTesting
  void flushInMemory() throws IOException {
    // Phase I: Update the pipeline
    getRegionServices().blockUpdates();
    try {
      MutableSegment active = getActive();
      if (LOG.isDebugEnabled()) {
        LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction 
pipeline, "
            + "and initiating compaction.");
      }
      pushActiveToPipeline(active);
    } finally {
      getRegionServices().unblockUpdates();
    }
    // Phase II: Compact the pipeline
    try {
      if (allowCompaction.get()) {
        // setting the inMemoryFlushInProgress flag again for the case this 
method is invoked
        // directly (only in tests) in the common path setting from true to 
true is idempotent
        inMemoryFlushInProgress.set(true);
        // Speculative compaction execution, may be interrupted if flush is 
forced while
        // compaction is in progress
        compactor.start();
      }
    } catch (IOException e) {
      LOG.warn("Unable to run memstore compaction. region "
          + getRegionServices().getRegionInfo().getRegionNameAsString()
          + "store: "+ getFamilyName(), e);
    } finally {
      stopCompaction();
    }
  }

  private byte[] getFamilyNameInByte() {
    return store.getFamily().getName();
  }

  private ThreadPoolExecutor getPool() {
    return getRegionServices().getInMemoryCompactionPool();
  }

  private boolean shouldFlushInMemory() {
    if(getActive().getSize() > inmemoryFlushSize) { // size above flush 
threshold
        // the inMemoryFlushInProgress is CASed to be true here in order to 
mutual exclude
        // the insert of the active into the compaction pipeline
        return (inMemoryFlushInProgress.compareAndSet(false,true));
    }
    return false;
  }  
{code}

Thank you very much once again! :)
Very much sorry for probably being unclear and slow :) :)

> Compacting Memstore : unnecessary push of active segments to pipeline
> ---------------------------------------------------------------------
>
>                 Key: HBASE-16162
>                 URL: https://issues.apache.org/jira/browse/HBASE-16162
>             Project: HBase
>          Issue Type: Sub-task
>            Reporter: Anoop Sam John
>            Assignee: Anoop Sam John
>            Priority: Critical
>         Attachments: HBASE-16162.patch, HBASE-16162_V2.patch, 
> HBASE-16162_V3.patch, HBASE-16162_V4.patch
>
>
> We have flow like this
> {code}
> protected void checkActiveSize() {
>     if (shouldFlushInMemory()) {
>          InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
>       }
>       getPool().execute(runnable);
>     }
>   }
> private boolean shouldFlushInMemory() {
>     if(getActive().getSize() > inmemoryFlushSize) {
>       // size above flush threshold
>       return (allowCompaction.get() && !inMemoryFlushInProgress.get());
>     }
>     return false;
>   }
> void flushInMemory() throws IOException {
>     // Phase I: Update the pipeline
>     getRegionServices().blockUpdates();
>     try {
>       MutableSegment active = getActive();
>       pushActiveToPipeline(active);
>     } finally {
>       getRegionServices().unblockUpdates();
>     }
>     // Phase II: Compact the pipeline
>     try {
>       if (allowCompaction.get() && 
> inMemoryFlushInProgress.compareAndSet(false, true)) {
>         // setting the inMemoryFlushInProgress flag again for the case this 
> method is invoked
>         // directly (only in tests) in the common path setting from true to 
> true is idempotent
>         // Speculative compaction execution, may be interrupted if flush is 
> forced while
>         // compaction is in progress
>         compactor.startCompaction();
>       }
> {code}
> So every write of cell will produce the check checkActiveSize().   When we 
> are at border of in mem flush,  many threads doing writes to this memstore 
> can get this checkActiveSize () to pass.  Yes the AtomicBoolean is still 
> false only. It is turned ON after some time once the new thread is started 
> run and it push the active to pipeline etc.
> In the new thread code of inMemFlush, we dont have any size check. It just 
> takes the active segment and pushes that to pipeline. Yes we dont allow any 
> new writes to memstore at this time.     But before that write lock on 
> region, other handler thread also might have added entry to this thread pool. 
>  When the 1st one finishes, it releases the lock on region and handler 
> threads trying for write to memstore, might get lock and add some data. Now 
> this 2nd in mem flush thread may get a chance and get the lock and so it just 
> takes current active segment and flush that in memory !    This will produce 
> very small sized segments to pipeline.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to