Repository: hadoop Updated Branches: refs/heads/HDFS-10285 d35255d43 -> c6a1e5ab4 (forced update)
HDFS-11334: [SPS]: NN switch and rescheduling movements can lead to have more than one coordinator for same file blocks. Contributed by Rakesh R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d37ae23e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d37ae23e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d37ae23e Branch: refs/heads/HDFS-10285 Commit: d37ae23effad2c4d98f926e8da4a36eb05f2d4cc Parents: 3bdcb4c Author: Uma Maheswara Rao G <uma.ganguma...@intel.com> Authored: Tue Apr 18 15:23:58 2017 -0700 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Thu Jul 12 17:00:17 2018 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 6 + .../server/blockmanagement/DatanodeManager.java | 12 ++ .../hdfs/server/datanode/BPServiceActor.java | 4 +- .../datanode/BlockStorageMovementTracker.java | 37 +++- .../hadoop/hdfs/server/datanode/DataNode.java | 12 +- .../datanode/StoragePolicySatisfyWorker.java | 95 +++++++++-- .../BlockStorageMovementAttemptedItems.java | 80 ++++++--- .../server/namenode/StoragePolicySatisfier.java | 15 +- .../protocol/BlocksStorageMovementResult.java | 6 +- .../src/main/proto/DatanodeProtocol.proto | 1 + .../TestStoragePolicySatisfyWorker.java | 68 ++++---- .../TestStoragePolicySatisfierWithHA.java | 170 +++++++++++++++++-- 13 files changed, 413 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 00152cc..b5341a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -619,7 +619,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.self.retry.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT = - 30 * 60 * 1000; + 20 * 60 * 1000; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final int DFS_DATANODE_DEFAULT_PORT = 9866; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 0c03608..996b986 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -985,6 +985,9 @@ public class PBHelper { case FAILURE: status = Status.FAILURE; break; + case IN_PROGRESS: + status = Status.IN_PROGRESS; + break; default: throw new AssertionError("Unknown status: " + resultProto.getStatus()); } @@ -1011,6 +1014,9 @@ public class PBHelper { case FAILURE: status = BlocksStorageMovementResultProto.Status.FAILURE; break; + case IN_PROGRESS: + status = BlocksStorageMovementResultProto.Status.IN_PROGRESS; + break; default: throw new AssertionError("Unknown status: " + report.getStatus()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index da340a8..2d7c80e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1091,6 +1091,18 @@ public class DatanodeManager { nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion()); nodeS.setDisallowed(false); // Node is in the include list + // Sets dropSPSWork flag to true, to ensure that + // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat + // response immediately after the node registration. This is + // to avoid a situation, where multiple trackId responses coming from + // different co-odinator datanodes. After SPS monitor time out, it + // will retry the files which were scheduled to the disconnected(for + // long time more than heartbeat expiry) DN, by finding new + // co-ordinator datanode. Now, if the expired datanode reconnects back + // after SPS reschedules, it leads to get different movement results + // from reconnected and new DN co-ordinators. + nodeS.setDropSPSWork(true); + // resolve network location if(this.rejectUnresolvedTopologyDN) { nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 0f93fb0..f537f49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -536,7 +536,7 @@ class BPServiceActor implements Runnable { // Remove the blocks movement results after successfully transferring // to namenode. - dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler() + dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() .remove(blksMovementResults); return response; @@ -544,7 +544,7 @@ class BPServiceActor implements Runnable { private BlocksStorageMovementResult[] getBlocksMovementResults() { List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn - .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler() + .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() .getBlksMovementResults(); BlocksStorageMovementResult[] blksMovementResult = new BlocksStorageMovementResult[trackIdVsMovementStatus.size()]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java index e623cef..99858bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -28,7 +29,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,32 +42,34 @@ public class BlockStorageMovementTracker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BlockStorageMovementTracker.class); private final CompletionService<BlockMovementResult> moverCompletionService; - private final BlocksMovementsCompletionHandler blksMovementscompletionHandler; + private final BlocksMovementsStatusHandler blksMovementsStatusHandler; // Keeps the information - trackID vs its list of blocks private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures; private final Map<Long, List<BlockMovementResult>> movementResults; + private volatile boolean running = true; + /** * BlockStorageMovementTracker constructor. * * @param moverCompletionService * completion service. * @param handler - * blocks movements completion handler + * blocks movements status handler */ public BlockStorageMovementTracker( CompletionService<BlockMovementResult> moverCompletionService, - BlocksMovementsCompletionHandler handler) { + BlocksMovementsStatusHandler handler) { this.moverCompletionService = moverCompletionService; this.moverTaskFutures = new HashMap<>(); - this.blksMovementscompletionHandler = handler; + this.blksMovementsStatusHandler = handler; this.movementResults = new HashMap<>(); } @Override public void run() { - while (true) { + while (running) { if (moverTaskFutures.size() <= 0) { try { synchronized (moverTaskFutures) { @@ -95,8 +98,8 @@ public class BlockStorageMovementTracker implements Runnable { synchronized (moverTaskFutures) { moverTaskFutures.remove(trackId); } - // handle completed blocks movements per trackId. - blksMovementscompletionHandler.handle(resultPerTrackIdList); + // handle completed or inprogress blocks movements per trackId. + blksMovementsStatusHandler.handle(resultPerTrackIdList); movementResults.remove(trackId); } } @@ -158,4 +161,22 @@ public class BlockStorageMovementTracker implements Runnable { movementResults.clear(); } } + + /** + * @return the list of trackIds which are still waiting to complete all the + * scheduled blocks movements. + */ + Set<Long> getInProgressTrackIds() { + synchronized (moverTaskFutures) { + return moverTaskFutures.keySet(); + } + } + + /** + * Sets running flag to false and clear the pending movement result queues. + */ + public void stopTracking() { + running = false; + removeAll(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index abc1f1e..196d4c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1425,6 +1425,7 @@ public class DataNode extends ReconfigurableBase blockRecoveryWorker = new BlockRecoveryWorker(this); storagePolicySatisfyWorker = new StoragePolicySatisfyWorker(getConf(), this); + storagePolicySatisfyWorker.start(); blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(getConf()); @@ -1976,7 +1977,11 @@ public class DataNode extends ReconfigurableBase } } } - + + // stop storagePolicySatisfyWorker + if (storagePolicySatisfyWorker != null) { + storagePolicySatisfyWorker.stop(); + } List<BPOfferService> bposArray = (this.blockPoolManager == null) ? new ArrayList<BPOfferService>() : this.blockPoolManager.getAllNamenodeThreads(); @@ -2129,6 +2134,11 @@ public class DataNode extends ReconfigurableBase notifyAll(); } tracer.close(); + + // Waiting to finish SPS worker thread. + if (storagePolicySatisfyWorker != null) { + storagePolicySatisfyWorker.waitToFinishWorkerThread(); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index a96ac98..f4f97dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; +import static org.apache.hadoop.util.Time.monotonicNow; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -31,7 +32,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; @@ -87,10 +90,13 @@ public class StoragePolicySatisfyWorker { private final int moverThreads; private final ExecutorService moveExecutor; private final CompletionService<BlockMovementResult> moverCompletionService; - private final BlocksMovementsCompletionHandler handler; + private final BlocksMovementsStatusHandler handler; private final BlockStorageMovementTracker movementTracker; private Daemon movementTrackerThread; + private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds. + private long nextInprogressRecheckTime; + public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); @@ -99,15 +105,52 @@ public class StoragePolicySatisfyWorker { DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); moverCompletionService = new ExecutorCompletionService<>(moveExecutor); - handler = new BlocksMovementsCompletionHandler(); + handler = new BlocksMovementsStatusHandler(); movementTracker = new BlockStorageMovementTracker(moverCompletionService, handler); movementTrackerThread = new Daemon(movementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); - movementTrackerThread.start(); + + // Interval to check that the inprogress trackIds. The time interval is + // proportional o the heart beat interval time period. + final long heartbeatIntervalSeconds = conf.getTimeDuration( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); + inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds; + // update first inprogress recheck time to a future time stamp. + nextInprogressRecheckTime = monotonicNow() + + inprogressTrackIdsCheckInterval; + // TODO: Needs to manage the number of concurrent moves per DataNode. } + /** + * Start StoragePolicySatisfyWorker, which will start block movement tracker + * thread to track the completion of block movements. + */ + void start() { + movementTrackerThread.start(); + } + + /** + * Stop StoragePolicySatisfyWorker, which will stop block movement tracker + * thread. + */ + void stop() { + movementTrackerThread.interrupt(); + movementTracker.stopTracking(); + } + + /** + * Timed wait to stop BlockStorageMovement tracker daemon thread. + */ + void waitToFinishWorkerThread() { + try { + movementTrackerThread.join(3000); + } catch (InterruptedException ie) { + } + } + private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) { LOG.debug("Block mover to satisfy storage policy; pool threads={}", num); @@ -352,11 +395,11 @@ public class StoragePolicySatisfyWorker { } /** - * Blocks movements completion handler, which is used to collect details of - * the completed list of block movements and this status(success or failure) - * will be send to the namenode via heartbeat. + * Blocks movements status handler, which is used to collect details of the + * completed or inprogress list of block movements and this status(success or + * failure or inprogress) will be send to the namenode via heartbeat. */ - static class BlocksMovementsCompletionHandler { + class BlocksMovementsStatusHandler { private final List<BlocksStorageMovementResult> trackIdVsMovementStatus = new ArrayList<>(); @@ -395,14 +438,21 @@ public class StoragePolicySatisfyWorker { * @return unmodifiable list of blocks storage movement results. */ List<BlocksStorageMovementResult> getBlksMovementResults() { + List<BlocksStorageMovementResult> movementResults = new ArrayList<>(); + // 1. Adding all the completed trackids. synchronized (trackIdVsMovementStatus) { - if (trackIdVsMovementStatus.size() <= 0) { - return new ArrayList<>(); + if (trackIdVsMovementStatus.size() > 0) { + movementResults = Collections + .unmodifiableList(trackIdVsMovementStatus); } - List<BlocksStorageMovementResult> results = Collections - .unmodifiableList(trackIdVsMovementStatus); - return results; } + // 2. Adding the in progress track ids after those which are completed. + Set<Long> inProgressTrackIds = getInProgressTrackIds(); + for (Long trackId : inProgressTrackIds) { + movementResults.add(new BlocksStorageMovementResult(trackId, + BlocksStorageMovementResult.Status.IN_PROGRESS)); + } + return movementResults; } /** @@ -433,7 +483,7 @@ public class StoragePolicySatisfyWorker { } @VisibleForTesting - BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() { + BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() { return handler; } @@ -447,4 +497,23 @@ public class StoragePolicySatisfyWorker { movementTracker.removeAll(); handler.removeAll(); } + + /** + * Gets list of trackids which are inprogress. Will do collection periodically + * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time. + * millis' interval. + * + * @return collection of trackids which are inprogress + */ + private Set<Long> getInProgressTrackIds() { + Set<Long> trackIds = new HashSet<>(); + long now = monotonicNow(); + if (nextInprogressRecheckTime >= now) { + trackIds = movementTracker.getInProgressTrackIds(); + + // schedule next re-check interval + nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval; + } + return trackIds; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 26b98d8..f2406da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +40,11 @@ import com.google.common.annotations.VisibleForTesting; * A monitor class for checking whether block storage movements finished or not. * If block storage movement results from datanode indicates about the movement * success, then it will just remove the entries from tracking. If it reports - * failure, then it will add back to needed block storage movements list. If no - * DN reports about movement for longer time, then such items will be retries - * automatically after timeout. The default timeout would be 30mins. + * failure, then it will add back to needed block storage movements list. If it + * reports in_progress, that means the blocks movement is in progress and the + * coordinator is still tracking the movement. If no DN reports about movement + * for longer time, then such items will be retries automatically after timeout. + * The default timeout would be 30mins. */ public class BlockStorageMovementAttemptedItems { private static final Logger LOG = @@ -57,10 +60,10 @@ public class BlockStorageMovementAttemptedItems { private Daemon timerThread = null; private final StoragePolicySatisfier sps; // - // It might take anywhere between 30 to 60 minutes before + // It might take anywhere between 20 to 60 minutes before // a request is timed out. // - private long selfRetryTimeout = 30 * 60 * 1000; + private long selfRetryTimeout = 20 * 60 * 1000; // // It might take anywhere between 5 to 10 minutes before @@ -159,35 +162,35 @@ public class BlockStorageMovementAttemptedItems { /** * This class contains information of an attempted trackID. Information such - * as, (a)last attempted time stamp, (b)whether all the blocks in the trackID - * were attempted and blocks movement has been scheduled to satisfy storage - * policy. This is used by + * as, (a)last attempted or reported time stamp, (b)whether all the blocks in + * the trackID were attempted and blocks movement has been scheduled to + * satisfy storage policy. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. */ private final static class ItemInfo { - private final long lastAttemptedTimeStamp; + private long lastAttemptedOrReportedTime; private final boolean allBlockLocsAttemptedToSatisfy; /** * ItemInfo constructor. * - * @param lastAttemptedTimeStamp - * last attempted time stamp + * @param lastAttemptedOrReportedTime + * last attempted or reported time * @param allBlockLocsAttemptedToSatisfy * whether all the blocks in the trackID were attempted and blocks * movement has been scheduled to satisfy storage policy */ - private ItemInfo(long lastAttemptedTimeStamp, + private ItemInfo(long lastAttemptedOrReportedTime, boolean allBlockLocsAttemptedToSatisfy) { - this.lastAttemptedTimeStamp = lastAttemptedTimeStamp; + this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy; } /** - * @return last attempted time stamp. + * @return last attempted or reported time stamp. */ - private long getLastAttemptedTimeStamp() { - return lastAttemptedTimeStamp; + private long getLastAttemptedOrReportedTime() { + return lastAttemptedOrReportedTime; } /** @@ -200,6 +203,14 @@ public class BlockStorageMovementAttemptedItems { private boolean isAllBlockLocsAttemptedToSatisfy() { return allBlockLocsAttemptedToSatisfy; } + + /** + * Update lastAttemptedOrReportedTime, so that the expiration time will be + * postponed to future. + */ + private void touchLastReportedTimeStamp() { + this.lastAttemptedOrReportedTime = monotonicNow(); + } } /** @@ -234,7 +245,8 @@ public class BlockStorageMovementAttemptedItems { while (iter.hasNext()) { Entry<Long, ItemInfo> entry = iter.next(); ItemInfo itemInfo = entry.getValue(); - if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) { + if (now > itemInfo.getLastAttemptedOrReportedTime() + + selfRetryTimeout) { Long blockCollectionID = entry.getKey(); synchronized (storageMovementAttemptedResults) { if (!isExistInResult(blockCollectionID)) { @@ -273,6 +285,7 @@ public class BlockStorageMovementAttemptedItems { Iterator<BlocksStorageMovementResult> resultsIter = storageMovementAttemptedResults.iterator(); while (resultsIter.hasNext()) { + boolean isInprogress = false; // TrackID need to be retried in the following cases: // 1) All or few scheduled block(s) movement has been failed. // 2) All the scheduled block(s) movement has been succeeded but there @@ -282,16 +295,19 @@ public class BlockStorageMovementAttemptedItems { BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter .next(); synchronized (storageMovementAttemptedItems) { - if (storageMovementAttemptedResult - .getStatus() == BlocksStorageMovementResult.Status.FAILURE) { + Status status = storageMovementAttemptedResult.getStatus(); + ItemInfo itemInfo; + switch (status) { + case FAILURE: blockStorageMovementNeeded .add(storageMovementAttemptedResult.getTrackId()); LOG.warn("Blocks storage movement results for the tracking id: {}" + " is reported from co-ordinating datanode, but result" + " status is FAILURE. So, added for retry", storageMovementAttemptedResult.getTrackId()); - } else { - ItemInfo itemInfo = storageMovementAttemptedItems + break; + case SUCCESS: + itemInfo = storageMovementAttemptedItems .get(storageMovementAttemptedResult.getTrackId()); // ItemInfo could be null. One case is, before the blocks movements @@ -320,10 +336,26 @@ public class BlockStorageMovementAttemptedItems { this.sps.notifyBlkStorageMovementFinished( storageMovementAttemptedResult.getTrackId()); } + break; + case IN_PROGRESS: + isInprogress = true; + itemInfo = storageMovementAttemptedItems + .get(storageMovementAttemptedResult.getTrackId()); + if(itemInfo != null){ + // update the attempted expiration time to next cycle. + itemInfo.touchLastReportedTimeStamp(); + } + break; + default: + LOG.error("Unknown status: {}", status); + break; + } + // Remove trackID from the attempted list if the attempt has been + // completed(success or failure), if any. + if (!isInprogress) { + storageMovementAttemptedItems + .remove(storageMovementAttemptedResult.getTrackId()); } - // Remove trackID from the attempted list, if any. - storageMovementAttemptedItems - .remove(storageMovementAttemptedResult.getTrackId()); } // Remove trackID from results as processed above. resultsIter.remove(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 8cf9920..8be0a2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -108,6 +108,11 @@ public class StoragePolicySatisfier implements Runnable { } else { LOG.info("Starting StoragePolicySatisfier."); } + + // Ensure that all the previously submitted block movements(if any) have to + // be stopped in all datanodes. + addDropSPSWorkCommandsToAllDNs(); + storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); @@ -133,7 +138,7 @@ public class StoragePolicySatisfier implements Runnable { LOG.info("Stopping StoragePolicySatisfier, as admin requested to " + "deactivate it."); this.clearQueuesWithNotification(); - this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + addDropSPSWorkCommandsToAllDNs(); } else { LOG.info("Stopping StoragePolicySatisfier."); } @@ -170,6 +175,14 @@ public class StoragePolicySatisfier implements Runnable { return namesystem.isFileOpenedForWrite(moverId); } + /** + * Adding drop commands to all datanodes to stop performing the satisfier + * block movements, if any. + */ + private void addDropSPSWorkCommandsToAllDNs() { + this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + } + @Override public void run() { boolean isMoverRunning = !checkIfMoverRunning(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java index 713b83b..b484eb1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java @@ -35,9 +35,13 @@ public class BlocksStorageMovementResult { * retry these failed blocks movements. Example selected target node is no * more running or no space. So, retrying by selecting new target node might * work. + * + * <p> + * IN_PROGRESS - If all or some of the blocks associated to track id are + * still moving. */ public static enum Status { - SUCCESS, FAILURE; + SUCCESS, FAILURE, IN_PROGRESS; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 899dc7e..080f7fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -192,6 +192,7 @@ message BlocksStorageMovementResultProto { enum Status { SUCCESS = 1; // block movement succeeded FAILURE = 2; // block movement failed and needs to retry + IN_PROGRESS = 3; // block movement is still in progress } required uint64 trackID = 1; required Status status = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index 86b8b50..8fbbf33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -176,16 +176,21 @@ public class TestStoragePolicySatisfyWorker { StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf, src); - List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); - BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo( - lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo, - lb.getStorageTypes()[0], StorageType.ARCHIVE); - blockMovingInfos.add(blockMovingInfo); - INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); - worker.processBlockMovingTasks(inode.getId(), - cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); - - waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000); + try { + worker.start(); + List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); + BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo( + lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo, + lb.getStorageTypes()[0], StorageType.ARCHIVE); + blockMovingInfos.add(blockMovingInfo); + INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); + worker.processBlockMovingTasks(inode.getId(), + cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); + + waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000); + } finally { + worker.stop(); + } } /** @@ -212,24 +217,29 @@ public class TestStoragePolicySatisfyWorker { StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf, src); - List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); - List<LocatedBlock> locatedBlocks = - dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks(); - for (LocatedBlock locatedBlock : locatedBlocks) { - BlockMovingInfo blockMovingInfo = - prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(), - locatedBlock.getLocations()[0], targetDnInfo, - locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE); - blockMovingInfos.add(blockMovingInfo); + worker.start(); + try { + List<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); + List<LocatedBlock> locatedBlocks = + dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks(); + for (LocatedBlock locatedBlock : locatedBlocks) { + BlockMovingInfo blockMovingInfo = + prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(), + locatedBlock.getLocations()[0], targetDnInfo, + locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE); + blockMovingInfos.add(blockMovingInfo); + } + INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); + worker.processBlockMovingTasks(inode.getId(), + cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); + // Wait till results queue build up + waitForBlockMovementResult(worker, inode.getId(), 30000); + worker.dropSPSWork(); + assertTrue(worker.getBlocksMovementsStatusHandler() + .getBlksMovementResults().size() == 0); + } finally { + worker.stop(); } - INode inode = cluster.getNamesystem().getFSDirectory().getINode(file); - worker.processBlockMovingTasks(inode.getId(), - cluster.getNamesystem().getBlockPoolId(), blockMovingInfos); - // Wait till results queue build up - waitForBlockMovementResult(worker, inode.getId(), 30000); - worker.dropSPSWork(); - assertTrue(worker.getBlocksMovementsCompletionHandler() - .getBlksMovementResults().size() == 0); } private void waitForBlockMovementResult( @@ -239,7 +249,7 @@ public class TestStoragePolicySatisfyWorker { @Override public Boolean get() { List<BlocksStorageMovementResult> completedBlocks = worker - .getBlocksMovementsCompletionHandler().getBlksMovementResults(); + .getBlocksMovementsStatusHandler().getBlksMovementResults(); return completedBlocks.size() > 0; } }, 100, timeout); @@ -252,7 +262,7 @@ public class TestStoragePolicySatisfyWorker { @Override public Boolean get() { List<BlocksStorageMovementResult> completedBlocks = worker - .getBlocksMovementsCompletionHandler().getBlksMovementResults(); + .getBlocksMovementsStatusHandler().getBlksMovementResults(); int failedCount = 0; for (BlocksStorageMovementResult blkMovementResult : completedBlocks) { if (blkMovementResult.getStatus() == http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37ae23e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java index 4d226ff..c88d5be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java @@ -17,51 +17,90 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; - -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tests that StoragePolicySatisfier is able to work with HA enabled. */ public class TestStoragePolicySatisfierWithHA { private MiniDFSCluster cluster = null; + private static final Logger LOG = + LoggerFactory.getLogger(TestStoragePolicySatisfierWithHA.class); - @Before - public void setUp() throws IOException { - Configuration conf = new Configuration(); + private final Configuration config = new HdfsConfiguration(); + private static final int DEFAULT_BLOCK_SIZE = 1024; + private DistributedFileSystem dfs = null; + + private StorageType[][] allDiskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}; + private int numOfDatanodes = 3; + private int storagesPerDatanode = 2; + private long capacity = 2 * 256 * 1024 * 1024; + private int nnIndex = 0; + + private void createCluster() throws IOException { + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode, + capacity); + dfs = cluster.getFileSystem(nnIndex); + } + + private void startCluster(final Configuration conf, + StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, + long nodeCapacity) throws IOException { + long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; + for (int i = 0; i < numberOfDatanodes; i++) { + for (int j = 0; j < storagesPerDn; j++) { + capacities[i][j] = nodeCapacity; + } + } cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(1) - .build(); + .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) + .storageTypes(storageTypes).storageCapacities(capacities).build(); + cluster.waitActive(); + cluster.transitionToActive(0); } /** * Tests to verify that SPS should run/stop automatically when NN state * changes between Standby and Active. */ - @Test(timeout = 100000) + @Test(timeout = 90000) public void testWhenNNHAStateChanges() throws IOException { try { - DistributedFileSystem fs; + createCluster(); boolean running; - cluster.waitActive(); - fs = cluster.getFileSystem(0); + dfs = cluster.getFileSystem(1); try { - fs.getClient().isStoragePolicySatisfierRunning(); + dfs.getClient().isStoragePolicySatisfierRunning(); Assert.fail("Call this function to Standby NN should " + "raise an exception."); } catch (RemoteException e) { @@ -72,14 +111,15 @@ public class TestStoragePolicySatisfierWithHA { } cluster.transitionToActive(0); - running = fs.getClient().isStoragePolicySatisfierRunning(); + dfs = cluster.getFileSystem(0); + running = dfs.getClient().isStoragePolicySatisfierRunning(); Assert.assertTrue("StoragePolicySatisfier should be active " + "when NN transits from Standby to Active mode.", running); // NN transits from Active to Standby cluster.transitionToStandby(0); try { - fs.getClient().isStoragePolicySatisfierRunning(); + dfs.getClient().isStoragePolicySatisfierRunning(); Assert.fail("NN in Standby again, call this function should " + "raise an exception."); } catch (RemoteException e) { @@ -106,4 +146,104 @@ public class TestStoragePolicySatisfierWithHA { cluster.shutdown(); } } + + /** + * Test to verify that during namenode switch over will add + * DNA_DROP_SPS_WORK_COMMAND to all the datanodes. Later, this will ensure to + * drop all the SPS queues at datanode. + */ + @Test(timeout = 90000) + public void testNamenodeSwitchoverShouldDropSPSWork() throws Exception { + try { + createCluster(); + + FSNamesystem fsn = cluster.getNamesystem(0); + ArrayList<DataNode> dataNodes = cluster.getDataNodes(); + List<DatanodeDescriptor> listOfDns = new ArrayList<>(); + for (DataNode dn : dataNodes) { + DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, + dn.getDatanodeId()); + listOfDns.add(dnd); + } + cluster.shutdownDataNodes(); + + cluster.transitionToStandby(0); + LOG.info("**Transition to Active**"); + cluster.transitionToActive(1); + + // Verify that Standby-to-Active transition should set drop SPS flag to + // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be + // propagated to datanode during heartbeat response. + int retries = 20; + boolean dropSPSWork = false; + while (retries > 0) { + for (DatanodeDescriptor dnd : listOfDns) { + dropSPSWork = dnd.shouldDropSPSWork(); + if (!dropSPSWork) { + retries--; + Thread.sleep(250); + break; + } + } + if (dropSPSWork) { + break; + } + } + Assert.assertTrue("Didn't drop SPS work", dropSPSWork); + } finally { + cluster.shutdown(); + } + } + + /** + * Test to verify that SPS work will be dropped once the datanode is marked as + * expired. Internally 'dropSPSWork' flag is set as true while expiration and + * at the time of reconnection, will send DNA_DROP_SPS_WORK_COMMAND to that + * datanode. + */ + @Test(timeout = 90000) + public void testDeadDatanode() throws Exception { + int heartbeatExpireInterval = 2 * 2000; + config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 3000); + config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000L); + createCluster(); + + DataNode dn = cluster.getDataNodes().get(0); + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + + FSNamesystem fsn = cluster.getNamesystem(0); + DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, + dn.getDatanodeId()); + boolean isDead = false; + int retries = 20; + while (retries > 0) { + isDead = dnd.getLastUpdateMonotonic() < (monotonicNow() + - heartbeatExpireInterval); + if (isDead) { + break; + } + retries--; + Thread.sleep(250); + } + Assert.assertTrue("Datanode is alive", isDead); + // Disable datanode heartbeat, so that the datanode will get expired after + // the recheck interval and become dead. + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + + // Verify that datanode expiration will set drop SPS flag to + // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be + // propagated to datanode during reconnection. + boolean dropSPSWork = false; + retries = 50; + while (retries > 0) { + dropSPSWork = dnd.shouldDropSPSWork(); + if (dropSPSWork) { + break; + } + retries--; + Thread.sleep(100); + } + Assert.assertTrue("Didn't drop SPS work", dropSPSWork); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org