[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13401168#comment-13401168
 ] 

Uma Maheswara Rao G commented on BOOKKEEPER-246:
------------------------------------------------

Thanks a lot Ivan for the update on the patch!

One doubt:

{code}
heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
+                        return ledgerId;
{code}

I remember, we have decided that One Bookie will replicate one ledger at a time 
right?

Then when this map will contain more than one element?


One point I wanted to mention is that: for example if one of the ledger 
fragment contains the current Bookie in his ensemble, then we should not take 
the replication work for that fragment. Worker will continue with other ledger 
fragments and finally it will not delete ledger node but will release the lock. 
That means current Bookie is no more eligible for replicating that ledger. That 
is the reason I thought to keep the colected childrens in Q. At this situation 
I will enqueue this ledger inti Q again, so that, for sure I will visit this 
ledger only after completing the remainig ledgers in Q. (Infuture we may can 
build priority Q.....example: if ledger is having only one replica that should 
have high priority...etc)

basic ReplicationWorker loop I have currently:
{code}
while (true) {
            DistributedLock lock = null;
            try {
                String ledger = underReplicatedLedgersQ.take();
                String ledgerToReplicate = underReplicatedPath + "/" + ledger;
                lock = new DistributedLock(zkc, ledgerToReplicate,
                        new RetryLockListenerImpl());
                if (false == lock.tryLock()) {
                    continue;
                }
                LOG
                        .info("Aquired the lock for replicating the fragments 
of ledger : "
                                + ledger);
                Stat uReplicatedZNodeStat = zkc
                        .exists(ledgerToReplicate, false);
                if (uReplicatedZNodeStat == null) {
                    continue; // Is this correct? or will throw exception?
                }
                int uReplicatedZNodeVersionNumber = uReplicatedZNodeStat
                        .getVersion();
                LedgerHandle ledgerHandle = getLedgerHandle(Long
                        .parseLong(ledger));

                CheckerCallback cb = new CheckerCallback();
                if (ledgerHandle != null) {
                    checker.checkLedger(ledgerHandle, cb);
                    Set<LedgerFragmentReplica> fragments = cb
                            .waitAndGetResult();

                    LOG.info("Found  " + fragments.size()
                            + " fragments for replication from ledger : "
                            + ledger + ", fragments are = " + fragments);
                    boolean isAllFragmentsReplicated = true;
                    boolean isFargmentsArePartOfEnsemble = false;
                    for (LedgerFragmentReplica lf : fragments) {

                        // Target node already part of ensemble, let's pick it
                        // by another Bookie for replication and continue with
                        // other fragment.
                        if (lf.getEnsemble().contains(targetBookieAddress)) {
                            LOG
                                    .info("Current Bookie "
                                            + targetBookieAddress
                                            + " is  part of ensemble. So lets 
other bookies "
                                            + "take care of replication "
                                            + ledger);
                            isFargmentsArePartOfEnsemble = true;
                            isAllFragmentsReplicated = false;
                            continue;
                        }

                        try {
                            isAllFragmentsReplicated = fragmentReplicator
                                    .replicate(lf, ledgerHandle,
                                            targetBookieAddress);
                        } catch (Exception e) {
                            LOG.info("Exception while replicating...", e);
                            isAllFragmentsReplicated = false;
                        }
                    }

                    if (false == isAllFragmentsReplicated) {
                        if (false == isFargmentsArePartOfEnsemble) {
                            // Replication failed. Let's retry.
                            enqueue(ledger);
                        }
                        continue;
                    }
                }

                // SuccessFully Replicated, Let's delete underReplicated Znode
                releaseLock(lock);
                LOG.info("Released the lock for ledger : " + ledger);

                // Clean up
                boolean success = deleteUnderreplicatedZNode(ledgerToReplicate,
                        uReplicatedZNodeVersionNumber);
                if (false == success) {
                    // Retry for replication as there may be version
                    // mismatch while deleting node. Simply add into Queue?
                    LOG.info("Underreplicated ledger node deletion failed."
                            + " Will retry replication for ledger : " + ledger);
                    enqueue(ledger);
                    continue;
                }
                LOG.info("Completed replication of ledger : " + ledger);

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted while replicating ledger fragments,"
                        + " stopping ReplicationWorker", e);
            } catch (Throwable e) {
                LOG.error("Fatal exception while replicating ledgers,"
                        + " stopping ReplicationWorker", e);
            } finally {
                if (null != lock && lock.hasLock()) {
                    try {
                        releaseLock(lock);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    }
                    LOG.info("Released the lock for ledger : " + lock);
                }
            }
        }
{code}
                
> Recording of underreplication of ledger entries
> -----------------------------------------------
>
>                 Key: BOOKKEEPER-246
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-246
>             Project: Bookkeeper
>          Issue Type: Sub-task
>          Components: bookkeeper-client, bookkeeper-server
>            Reporter: Ivan Kelly
>            Assignee: Ivan Kelly
>             Fix For: 4.2.0
>
>         Attachments: BOOKKEEPER-246.diff, BOOKKEEPER-246.diff
>
>
> This JIRA is to decide how to record that entries in a ledger are 
> underreplicated. 
> I think there is a common understanding (correct me if im wrong), that 
> rereplication can be broken into two logically distinct phases. A) Detection 
> of entry underreplication & B) Rereplication. 
> This subtask is to handle the interaction between these two stages. Stage B 
> needs to know what to rereplicate; how should Stage A inform it?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to