[ 
https://issues.apache.org/jira/browse/HDFS-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15717913#comment-15717913
 ] 

Manjunath edited comment on HDFS-11182 at 12/3/16 11:03 AM:
------------------------------------------------------------

[~arpitagarwal] , sorry for the late code review as this comment holds good 
more for the JIRA HDFS-11149. Thanks for the effort for this change. In 
DatasetVolumeChecker the checkAllVolumes and checkAllVolumesAsync does the same 
tasks except for few additional things in checkAllVolumes. The code can be 
modified such that checkAllVolumes inturn calls the checkAllVolumesAsync and 
save lines of code and also make sure that both does the same intended work 
without code duplication in future as well.

{code}
 public ResultHandler checkAllVolumesAsync(
            final FsDatasetSpi<? extends FsVolumeSpi> dataset,
            Callback callback) {

        if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
            numSkippedChecks.incrementAndGet();
            return null;
        }

        lastAllVolumesCheck = timer.monotonicNow();
        final Set<StorageLocation> healthyVolumes = new HashSet<>();
        final Set<StorageLocation> failedVolumes = new HashSet<>();
        final Set<StorageLocation> allVolumes = new HashSet<>();

        final FsDatasetSpi.FsVolumeReferences references =
                dataset.getFsVolumeReferences();
        final CountDownLatch latch = new CountDownLatch(references.size());

        LOG.info("Checking {} volumes", references.size());
        for (int i = 0; i < references.size(); ++i) {
            final FsVolumeReference reference = references.getReference(i);
            allVolumes.add(reference.getVolume().getStorageLocation());
            // The context parameter is currently ignored.
            ListenableFuture<VolumeCheckResult> future =
                    delegateChecker.schedule(reference.getVolume(), 
IGNORED_CONTEXT);
            LOG.info("Scheduled health check for volume {}", 
reference.getVolume());
            Futures.addCallback(future, new ResultHandler(
                    reference, healthyVolumes, failedVolumes, latch, callback));
        }
        numAsyncDatasetChecks.incrementAndGet();
        return new ResultHandler(
                null, healthyVolumes, failedVolumes, latch, callback);
    }

public Set<StorageLocation> checkAllVolumes(
            final FsDatasetSpi<? extends FsVolumeSpi> dataset)
            throws InterruptedException {

        ResultHandler resultHandler = checkAllVolumesAsync(dataset, null);

        if (resultHandler == null) {
            return Collections.emptySet();
        }

        // Wait until our timeout elapses, after which we give up on
        // the remaining volumes.
        if (!resultHandler.getResultsLatch().await(maxAllowedTimeForCheckMs, 
TimeUnit.MILLISECONDS)) {
            LOG.warn("checkAllVolumes timed out after {} ms" +
                    maxAllowedTimeForCheckMs);
        }

        numSyncDatasetChecks.incrementAndGet();
        synchronized (this) {
            // All volumes that have not been detected as healthy should be
            // considered failed. This is a superset of 'failedVolumes'.
            //
            // Make a copy under the mutex as Sets.difference() returns a view
            // of a potentially changing set.
            return new HashSet<>(Sets.difference(resultHandler.getAllVolumes(), 
resultHandler.getHealthyVolumes()));
        }
    }

{code}
Although this is not a great thing but it will save duplicate code and 
differences of core behaviour between the 2 methods which is currently present 
as well in terms of some loggers eg:- 
{code} LOG.info("Scheduled health check for volume {}", reference.getVolume()); 
{code} this is there only in checkAllVolumes where as {code} LOG.info("Checking 
{} volumes", references.size());{code} is present only in  
checkAllVolumesAsync. Please let me know your thoughts on this.


was (Author: manju_hadoop):
[~arpitagarwal] , sorry for the late code review as this comment holds good 
more for the JIRA HDFS-11149. Thanks for the effort for this change. In 
DatasetVolumeChecker the checkAllVolumes and checkAllVolumesAsync does the same 
tasks except for few additional things in checkAllVolumes. The code can be 
modified such that checkAllVolumes inturn calls the checkAllVolumesAsync and 
save lines of code and also make sure that both does the same intended work 
without code duplication in future as well.

{code}
 public ResultHandler checkAllVolumesAsync(
            final FsDatasetSpi<? extends FsVolumeSpi> dataset,
            Callback callback) {

        if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
            numSkippedChecks.incrementAndGet();
            return null;
        }

        lastAllVolumesCheck = timer.monotonicNow();
        final Set<StorageLocation> healthyVolumes = new HashSet<>();
        final Set<StorageLocation> failedVolumes = new HashSet<>();
        final Set<StorageLocation> allVolumes = new HashSet<>();

        final FsDatasetSpi.FsVolumeReferences references =
                dataset.getFsVolumeReferences();
        final CountDownLatch latch = new CountDownLatch(references.size());

        LOG.info("Checking {} volumes", references.size());
        for (int i = 0; i < references.size(); ++i) {
            final FsVolumeReference reference = references.getReference(i);
            allVolumes.add(reference.getVolume().getStorageLocation());
            // The context parameter is currently ignored.
            ListenableFuture<VolumeCheckResult> future =
                    delegateChecker.schedule(reference.getVolume(), 
IGNORED_CONTEXT);
            LOG.info("Scheduled health check for volume {}", 
reference.getVolume());
            Futures.addCallback(future, new ResultHandler(
                    reference, healthyVolumes, failedVolumes, latch, callback));
        }
        numAsyncDatasetChecks.incrementAndGet();
        return new ResultHandler(
                null, healthyVolumes, failedVolumes, latch, callback);
    }

public Set<StorageLocation> checkAllVolumes(
            final FsDatasetSpi<? extends FsVolumeSpi> dataset)
            throws InterruptedException {

        ResultHandler resultHandler = checkAllVolumesAsync(dataset, null);

        if (resultHandler == null) {
            return Collections.emptySet();
        }

        // Wait until our timeout elapses, after which we give up on
        // the remaining volumes.
        if (!resultHandler.getResultsLatch().await(maxAllowedTimeForCheckMs, 
TimeUnit.MILLISECONDS)) {
            LOG.warn("checkAllVolumes timed out after {} ms" +
                    maxAllowedTimeForCheckMs);
        }

        numSyncDatasetChecks.incrementAndGet();
        synchronized (this) {
            // All volumes that have not been detected as healthy should be
            // considered failed. This is a superset of 'failedVolumes'.
            //
            // Make a copy under the mutex as Sets.difference() returns a view
            // of a potentially changing set.
            return new HashSet<>(Sets.difference(resultHandler.getAllVolumes(), 
resultHandler.getHealthyVolumes()));
        }
    }

{code}
Although this is not a great thing but it will save duplicate code and 
differences of core behaviour between the 2 methods which is currently present 
as well in terms of some loggers eg:- LOG.info("Scheduled health check for 
volume {}", reference.getVolume()); this is there only in checkAllVolumes where 
as LOG.info("Checking {} volumes", references.size()); is present only in  
checkAllVolumesAsync. Please let me know your thoughts on this.

> Update DataNode to use DatasetVolumeChecker
> -------------------------------------------
>
>                 Key: HDFS-11182
>                 URL: https://issues.apache.org/jira/browse/HDFS-11182
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: datanode
>            Reporter: Arpit Agarwal
>            Assignee: Arpit Agarwal
>
> Update DataNode to use the DatasetVolumeChecker class introduced in 
> HDFS-11149 to parallelize disk checks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to