http://git-wip-us.apache.org/repos/asf/hadoop/blob/db22affd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b692a33,ad3c172..243dbd2 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@@ -390,7 -389,10 +395,10 @@@ public class DataNode extends Reconfigu private static final int NUM_CORES = Runtime.getRuntime() .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; + private DiskBalancer diskBalancer; + - + private final SocketFactory socketFactory; + private OzoneContainer ozoneServer; private static Tracer createTracer(Configuration conf) { return new Tracer.Builder("DataNode"). @@@ -421,9 -421,9 +429,10 @@@ this.connectToDnViaHostname = false; this.blockScanner = new BlockScanner(this, conf); this.pipelineSupportECN = false; + this.ozoneEnabled = false; this.checkDiskErrorInterval = ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25)); + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); initOOBTimeout(); } @@@ -1145,9 -1183,26 +1194,26 @@@ * Report a bad block which is hosted on the local DN. */ public void reportBadBlocks(ExtendedBlock block) throws IOException{ - BPOfferService bpos = getBPOSForBlock(block); FsVolumeSpi volume = getFSDataset().getVolume(block); - bpos.reportBadBlocks( + if (volume == null) { + LOG.warn("Cannot find FsVolumeSpi to report bad block: " + block); + return; + } + reportBadBlocks(block, volume); + } + + /** + * Report a bad block which is hosted on the local DN. + * + * @param block the bad block which is hosted on the local DN + * @param volume the volume that block is stored in and the volume + * must not be null + * @throws IOException + */ + public void reportBadBlocks(ExtendedBlock block, FsVolumeSpi volume) + throws IOException { + BPOfferService bpos = getBPOSForBlock(block); - bpos.reportBadBlocks( ++ bpos.reportBadBlocks( block, volume.getStorageID(), volume.getStorageType()); } @@@ -1554,15 -1595,7 +1620,16 @@@ data.addBlockPool(nsInfo.getBlockPoolID(), conf); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(conf); + if(this.ozoneEnabled) { + try { + ozoneServer = new OzoneContainer(conf, this.getFSDataset()); + ozoneServer.start(); + LOG.info("Ozone container server started."); + } catch (Exception ex) { + LOG.error("Unable to start Ozone. ex: {}", ex.toString()); + } + } + initDiskBalancer(data, conf); } List<BPOfferService> getAllBpOs() { @@@ -3338,4 -3402,75 +3455,74 @@@ public Tracer getTracer() { return tracer; } - + /** + * Allows submission of a disk balancer Job. + * @param planID - Hash value of the plan. + * @param planVersion - Plan version, reserved for future use. We have only + * version 1 now. + * @param planFile - Plan file name + * @param planData - Actual plan data in json format + * @throws IOException + */ + @Override + public void submitDiskBalancerPlan(String planID, long planVersion, + String planFile, String planData, boolean skipDateCheck) + throws IOException { + checkSuperuserPrivilege(); + // TODO : Support force option + this.diskBalancer.submitPlan(planID, planVersion, planFile, planData, + skipDateCheck); + } + + /** + * Cancels a running plan. + * @param planID - Hash string that identifies a plan + */ + @Override + public void cancelDiskBalancePlan(String planID) throws + IOException { + checkSuperuserPrivilege(); + this.diskBalancer.cancelPlan(planID); + } + + /** + * Returns the status of current or last executed work plan. + * @return DiskBalancerWorkStatus. + * @throws IOException + */ + @Override + public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException { + checkSuperuserPrivilege(); + return this.diskBalancer.queryWorkStatus(); + } + + /** + * Gets a runtime configuration value from diskbalancer instance. For + * example : DiskBalancer bandwidth. + * + * @param key - String that represents the run time key value. + * @return value of the key as a string. + * @throws IOException - Throws if there is no such key + */ + @Override + public String getDiskBalancerSetting(String key) throws IOException { + checkSuperuserPrivilege(); + Preconditions.checkNotNull(key); + switch (key) { + case DiskBalancerConstants.DISKBALANCER_VOLUME_NAME: + return this.diskBalancer.getVolumeNames(); + case DiskBalancerConstants.DISKBALANCER_BANDWIDTH : + return Long.toString(this.diskBalancer.getBandwidth()); + default: + LOG.error("Disk Balancer - Unknown key in get balancer setting. Key: " + + key); + throw new DiskBalancerException("Unknown key", + DiskBalancerException.Result.UNKNOWN_KEY); + } + } + + @VisibleForTesting + void setBlockScanner(BlockScanner blockScanner) { + this.blockScanner = blockScanner; + } -} +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db22affd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/db22affd/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/db22affd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org