[ https://issues.apache.org/jira/browse/HDFS-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15717913#comment-15717913 ]
Manjunath edited comment on HDFS-11182 at 12/3/16 11:02 AM: ------------------------------------------------------------ [~arpitagarwal] , sorry for the late code review as this comment holds good more for the JIRA HDFS-11149. Thanks for the effort for this change. In DatasetVolumeChecker the checkAllVolumes and checkAllVolumesAsync does the same tasks except for few additional things in checkAllVolumes. The code can be modified such that checkAllVolumes inturn calls the checkAllVolumesAsync and save lines of code and also make sure that both does the same intended work without code duplication in future as well. {code} public ResultHandler checkAllVolumesAsync( final FsDatasetSpi<? extends FsVolumeSpi> dataset, Callback callback) { if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { numSkippedChecks.incrementAndGet(); return null; } lastAllVolumesCheck = timer.monotonicNow(); final Set<StorageLocation> healthyVolumes = new HashSet<>(); final Set<StorageLocation> failedVolumes = new HashSet<>(); final Set<StorageLocation> allVolumes = new HashSet<>(); final FsDatasetSpi.FsVolumeReferences references = dataset.getFsVolumeReferences(); final CountDownLatch latch = new CountDownLatch(references.size()); LOG.info("Checking {} volumes", references.size()); for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); allVolumes.add(reference.getVolume().getStorageLocation()); // The context parameter is currently ignored. ListenableFuture<VolumeCheckResult> future = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); LOG.info("Scheduled health check for volume {}", reference.getVolume()); Futures.addCallback(future, new ResultHandler( reference, healthyVolumes, failedVolumes, latch, callback)); } numAsyncDatasetChecks.incrementAndGet(); return new ResultHandler( null, healthyVolumes, failedVolumes, latch, callback); } public Set<StorageLocation> checkAllVolumes( final FsDatasetSpi<? extends FsVolumeSpi> dataset) throws InterruptedException { ResultHandler resultHandler = checkAllVolumesAsync(dataset, null); if (resultHandler == null) { return Collections.emptySet(); } // Wait until our timeout elapses, after which we give up on // the remaining volumes. if (!resultHandler.getResultsLatch().await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { LOG.warn("checkAllVolumes timed out after {} ms" + maxAllowedTimeForCheckMs); } numSyncDatasetChecks.incrementAndGet(); synchronized (this) { // All volumes that have not been detected as healthy should be // considered failed. This is a superset of 'failedVolumes'. // // Make a copy under the mutex as Sets.difference() returns a view // of a potentially changing set. return new HashSet<>(Sets.difference(resultHandler.getAllVolumes(), resultHandler.getHealthyVolumes())); } } {code} Although this is not a great thing but it will save duplicate code and differences of core behaviour between the 2 methods which is currently present as well in terms of some loggers eg:- LOG.info("Scheduled health check for volume {}", reference.getVolume()); this is there only in checkAllVolumes where as LOG.info("Checking {} volumes", references.size()); is present only in checkAllVolumesAsync. Please let me know your thoughts on this. was (Author: manju_hadoop): [~arpitagarwal] , sorry for the late code review as this comment holds good more for the JIRA HDFS-11149. Thanks for the effort for this change. In DatasetVolumeChecker the checkAllVolumes and checkAllVolumesAsync are supposed to do same tasks except for few additional things in checkAllVolumes. The code can be modified such that checkAllVolumes inturn calls the checkAllVolumesAsync and save lines of code and also make sure that both does the same intended work without code duplication in future as well. {code} public ResultHandler checkAllVolumesAsync( final FsDatasetSpi<? extends FsVolumeSpi> dataset, Callback callback) { if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { numSkippedChecks.incrementAndGet(); return null; } lastAllVolumesCheck = timer.monotonicNow(); final Set<StorageLocation> healthyVolumes = new HashSet<>(); final Set<StorageLocation> failedVolumes = new HashSet<>(); final Set<StorageLocation> allVolumes = new HashSet<>(); final FsDatasetSpi.FsVolumeReferences references = dataset.getFsVolumeReferences(); final CountDownLatch latch = new CountDownLatch(references.size()); LOG.info("Checking {} volumes", references.size()); for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); allVolumes.add(reference.getVolume().getStorageLocation()); // The context parameter is currently ignored. ListenableFuture<VolumeCheckResult> future = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); LOG.info("Scheduled health check for volume {}", reference.getVolume()); Futures.addCallback(future, new ResultHandler( reference, healthyVolumes, failedVolumes, latch, callback)); } numAsyncDatasetChecks.incrementAndGet(); return new ResultHandler( null, healthyVolumes, failedVolumes, latch, callback); } public Set<StorageLocation> checkAllVolumes( final FsDatasetSpi<? extends FsVolumeSpi> dataset) throws InterruptedException { ResultHandler resultHandler = checkAllVolumesAsync(dataset, null); if (resultHandler == null) { return Collections.emptySet(); } // Wait until our timeout elapses, after which we give up on // the remaining volumes. if (!resultHandler.getResultsLatch().await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { LOG.warn("checkAllVolumes timed out after {} ms" + maxAllowedTimeForCheckMs); } numSyncDatasetChecks.incrementAndGet(); synchronized (this) { // All volumes that have not been detected as healthy should be // considered failed. This is a superset of 'failedVolumes'. // // Make a copy under the mutex as Sets.difference() returns a view // of a potentially changing set. return new HashSet<>(Sets.difference(resultHandler.getAllVolumes(), resultHandler.getHealthyVolumes())); } } {code} Although this is not a great thing but it will save duplicate code and differences of core behaviour between the 2 methods which is currently present as well in terms of some loggers eg:- LOG.info("Scheduled health check for volume {}", reference.getVolume()); this is there only in checkAllVolumes where as LOG.info("Checking {} volumes", references.size()); is present only in checkAllVolumesAsync. Please let me know your thoughts on this. > Update DataNode to use DatasetVolumeChecker > ------------------------------------------- > > Key: HDFS-11182 > URL: https://issues.apache.org/jira/browse/HDFS-11182 > Project: Hadoop HDFS > Issue Type: Sub-task > Components: datanode > Reporter: Arpit Agarwal > Assignee: Arpit Agarwal > > Update DataNode to use the DatasetVolumeChecker class introduced in > HDFS-11149 to parallelize disk checks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org