[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13447193#comment-13447193
 ] 

Ivan Kelly commented on BOOKKEEPER-325:
---------------------------------------

Sorry for taking so long with this. My weekend ended up getting very busy in 
the end, so I only got to it this morning.

ClientUtil is a test class. It shouldn't be used in production code. It breaks 
the encapsulation of the client package for testing, but this is not something 
we want exposed for API users. However, it's not needed anyhow. with 
BOOKKEEPER-386, the information needed for the decision is available in 
LedgerFragment. 

I think the call from doReplicateFragments could be flattened. Currently 
there's a lot of booleans being passed around, which looks to me as if flow 
control is being shared across two methods when it should only be in one. I 
think a little refactor would make things clearer here. 

I suggest the following:

ReplicationWorker#run() should only deal with running the worker thread, so it 
should be quite simple.
{code}
    @Override
    public void run() {
        workerRunning = true;
        while (workerRunning) {
            try {
                rereplicate();
            } catch (InterruptedException e) {
                shutdown();
                Thread.currentThread().interrupt();
                LOG.info("InterruptedException "
                        + "while replicating fragments", e);
                return;
            } catch (BKException e) {
                shutdown();
                LOG.error("BKException while replicating fragments", e);
                return;
            } catch (UnavailableException e) {
                shutdown();
                LOG.error("UnavailableException "
                        + "while replicating fragments", e);
                return;
            }
        }
    }
{code}

Then #rereplicate() should contain what was in the try block and 
doReplicateFragment().
{code}
void rereplicate() throws InterruptedException, BKException, 
UnavailableException {
   long ledgerIdToReplicate = underreplicationManager.getLedgerToRereplicate();
   LOG.info("Going to replicate the fragments of the ledger: "
            + ledgerIdToReplicate);
   LedgerHandle lh;
   try {
       lh = admin.openLedgerNoRecovery(ledgerIdToReplicate);
   } catch (BKNoSuchLedgerExistsException e) {
       // Ledger might have been deleted by user
       LOG.info("BKNoSuchLedgerExistsException while opening "
                + "ledger for replication. Other clients "
                + "might have deleted the ledger. "
                + "So, no harm to continue");
       underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
       return;
    } catch (BKReadException e) {
       LOG.info("BKReadException while"
                + " opening ledger for replication."
                + " Enough Bookies might not have available"
                + "So, no harm to continue");
       
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
       return;
    } catch (BKBookieHandleNotAvailableException e) {
       LOG.info("BKBookieHandleNotAvailableException while"
                + " opening ledger for replication."
                + " Enough Bookies might not have available"
                + "So, no harm to continue");
       
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
       return;
    }
    CheckerCallback checkerCb = new CheckerCallback();
    ledgerChecker.checkLedger(lh, checkerCb);
    Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
    LOG.info("Founds fragments " + fragments
             + " for replication from ledger: "
             + ledgerIdToReplicate);

    boolean foundOpenFragments = false;
    for (LedgerFragment ledgerFragment : fragments) {
         if (!ledgerFragment.isClosed()) {
             foundOpenFragments = true;
         } else if (isTargetBookieExistsInFragmentEnsemble(ledgerFragment)) {
             LOG.info("Target Bookie[" + targetBookie
                      + "] found in the fragment ensemble:"
                      + ledgerFragment.getEnsemble());
             continue;
         }
         admin.replicateLedgerFragment(lh,ledgerFragment, targetBookie);
    }

    if (foundOpenFragments) {
        deferLedgerLockRelease(ledgerIdToReplicate);
        return;
    }
    
    CheckerCallback checkerCb = new CheckerCallback();
    ledgerChecker.checkLedger(lh, checkerCb);
    Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();

    if (fragments.size() == 0) {
        LOG.info("Ledger replicated successfully. ledger id is: "
                 + ledgerIdToReplicate);
        underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
    } else {
        // Releasing the underReplication ledger lock and compete
        // for the replication again for the pending fragments
        
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
    }
}

{code}

We'd also need a try/finally to close the lh, but I left it out for clarity.

The call to checkLedgers could be encapsulated in a utility method to avoid 
having to construct the CheckerCallback every time.

For PendingReplications, I still think this would be easier to do with a 
TimerTask. A Timer will only use one thread, but it already has all the logic 
for dealing with timeouts, rather than rewriting it all again as you have in 
#pendingReplicationCheck.

With a timer task, PendingReplications could be replaced by one method.
{code}
void deferLedgerLockRelease(long ledgerId) {
   long gracePeriod = etc etc;
   TimerTask t = new TimerTask() {
      public void run() {
          LedgerHandle lh = null;
          try {
              lh = bkadmin.openLedgerNoRecovery(ledgerId);
              Set<LedgerFragment> fragments = getUnreplicatedFragments(lh);
              for (LedgerFragment f : fragments) {
                  if (!f.isClosed()) {
                      lh = bkadmin.openLedger(ledgerId);
                      break;
                  }
              }
          } catch (etc etc) {
              // take appropriate action
          } finally {
              if (lh != null) {
                  lh.close();
              }
              
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
          }          
      }
   };
   timer.schedule(t, gracePeriod);
}
{code}

                
> Delay the replication of a ledger if RW found that its last fragment is in 
> underReplication.
> --------------------------------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-325
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-325
>             Project: Bookkeeper
>          Issue Type: Sub-task
>          Components: bookkeeper-auto-recovery
>    Affects Versions: 4.2.0
>            Reporter: Uma Maheswara Rao G
>            Assignee: Uma Maheswara Rao G
>         Attachments: BOOKKEEPER-325.patch
>
>
> When RW found that ledger's last fragment is in underReplication state, then 
> we should delay that ledger replication for some grace period. optimally we 
> can replicate other fragments.
> The idea is, Whenever it finds the last fragement is under replicated, It can 
> add into PendingReplication list.
> There will be a small daemon, which will check for the timeouts of this 
> ledgers. 
> Once it timed out , it will trigger the normal replication process if it is 
> not in last fragment. Otherwise, it will fence the ledger and will trigger 
> the replication nomally.
> see the discussion for more info:
> http://markmail.org/message/ruhhxxgvuqnjlu2s#query:+page:1+mid:f6ifo4sizulwiaem+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to