Repository: bookkeeper Updated Branches: refs/heads/master ab707d2c6 -> 48aa69dd0
BOOKKEEPER-1031: close the ledger handle in ReplicationWorker.rereplicate â¦cate Otherwise, we build up an unbounded set of Listeners in the AbstractZkLedgerManager listenerSet structure which never go away. Signed-off-by: Samuel Just <sjustsalesforce.com> Author: Samuel Just <[email protected]> Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo <[email protected]> Closes #130 from athanatos/forupstream/BOOKKEEPER-1031 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/48aa69dd Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/48aa69dd Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/48aa69dd Branch: refs/heads/master Commit: 48aa69dd0ba41f5ba7bb2b04f31172c919be4391 Parents: ab707d2 Author: Samuel Just <[email protected]> Authored: Tue Apr 11 11:14:19 2017 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Apr 11 11:14:19 2017 -0700 ---------------------------------------------------------------------- .../replication/ReplicationWorker.java | 110 +++++++++---------- 1 file changed, 54 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/48aa69dd/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 3f2261f..e6e986f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -225,9 +225,60 @@ public class ReplicationWorker implements Runnable { private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException, UnavailableException { LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate); - LedgerHandle lh; - try { - lh = admin.openLedgerNoRecovery(ledgerIdToReplicate); + try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) { + Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh); + LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate); + + boolean foundOpenFragments = false; + long numFragsReplicated = 0; + for (LedgerFragment ledgerFragment : fragments) { + if (!ledgerFragment.isClosed()) { + foundOpenFragments = true; + continue; + } else if (isTargetBookieExistsInFragmentEnsemble(lh, + ledgerFragment)) { + LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie, + ledgerFragment.getEnsemble()); + continue; + } + try { + admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie); + numFragsReplicated++; + } catch (BKException.BKBookieHandleNotAvailableException e) { + LOG.warn("BKBookieHandleNotAvailableException " + + "while replicating the fragment", e); + } catch (BKException.BKLedgerRecoveryException e) { + LOG.warn("BKLedgerRecoveryException " + + "while replicating the fragment", e); + if (admin.getReadOnlyBookies().contains(targetBookie)) { + underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate); + throw new BKException.BKWriteOnReadOnlyBookieException(); + } + } + } + + if (numFragsReplicated > 0) { + numLedgersReplicated.inc(); + } + + if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) { + deferLedgerLockRelease(ledgerIdToReplicate); + return false; + } + + fragments = getUnderreplicatedFragments(lh); + if (fragments.size() == 0) { + LOG.info("Ledger replicated successfully. ledger id is: " + + ledgerIdToReplicate); + underreplicationManager.markLedgerReplicated(ledgerIdToReplicate); + return true; + } else { + // Releasing the underReplication ledger lock and compete + // for the replication again for the pending fragments + underreplicationManager + .releaseUnderreplicatedLedger(ledgerIdToReplicate); + return false; + } } catch (BKNoSuchLedgerExistsException e) { // Ledger might have been deleted by user LOG.info("BKNoSuchLedgerExistsException while opening " @@ -253,59 +304,6 @@ public class ReplicationWorker implements Runnable { .releaseUnderreplicatedLedger(ledgerIdToReplicate); return false; } - Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh); - LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate); - - boolean foundOpenFragments = false; - long numFragsReplicated = 0; - for (LedgerFragment ledgerFragment : fragments) { - if (!ledgerFragment.isClosed()) { - foundOpenFragments = true; - continue; - } else if (isTargetBookieExistsInFragmentEnsemble(lh, - ledgerFragment)) { - LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie, - ledgerFragment.getEnsemble()); - continue; - } - try { - admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie); - numFragsReplicated++; - } catch (BKException.BKBookieHandleNotAvailableException e) { - LOG.warn("BKBookieHandleNotAvailableException " - + "while replicating the fragment", e); - } catch (BKException.BKLedgerRecoveryException e) { - LOG.warn("BKLedgerRecoveryException " - + "while replicating the fragment", e); - if (admin.getReadOnlyBookies().contains(targetBookie)) { - underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate); - throw new BKException.BKWriteOnReadOnlyBookieException(); - } - } - } - - if (numFragsReplicated > 0) { - numLedgersReplicated.inc(); - } - - if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) { - deferLedgerLockRelease(ledgerIdToReplicate); - return false; - } - - fragments = getUnderreplicatedFragments(lh); - if (fragments.size() == 0) { - LOG.info("Ledger replicated successfully. ledger id is: " - + ledgerIdToReplicate); - underreplicationManager.markLedgerReplicated(ledgerIdToReplicate); - return true; - } else { - // Releasing the underReplication ledger lock and compete - // for the replication again for the pending fragments - underreplicationManager - .releaseUnderreplicatedLedger(ledgerIdToReplicate); - return false; - } } /**
