http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index adec209..15e71f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; +import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; @@ -241,10 +242,11 @@ public interface FsVolumeSpi private final FsVolumeSpi volume; + private final FileRegion fileRegion; /** * Get the file's length in async block scan */ - private final long blockFileLength; + private final long blockLength; private final static Pattern CONDENSED_PATH_REGEX = Pattern.compile("(?<!^)(\\\\|/){2,}"); @@ -294,13 +296,30 @@ public interface FsVolumeSpi */ public ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { + this(blockId, blockFile, metaFile, vol, null, + (blockFile != null) ? blockFile.length() : 0); + } + + /** + * Create a ScanInfo object for a block. This constructor will examine + * the block data and meta-data files. + * + * @param blockId the block ID + * @param blockFile the path to the block data file + * @param metaFile the path to the block meta-data file + * @param vol the volume that contains the block + * @param fileRegion the file region (for provided blocks) + * @param length the length of the block data + */ + public ScanInfo(long blockId, File blockFile, File metaFile, + FsVolumeSpi vol, FileRegion fileRegion, long length) { this.blockId = blockId; String condensedVolPath = (vol == null || vol.getBaseURI() == null) ? null : getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath()); this.blockSuffix = blockFile == null ? null : getSuffix(blockFile, condensedVolPath); - this.blockFileLength = (blockFile != null) ? blockFile.length() : 0; + this.blockLength = length; if (metaFile == null) { this.metaSuffix = null; } else if (blockFile == null) { @@ -310,6 +329,7 @@ public interface FsVolumeSpi condensedVolPath + blockSuffix); } this.volume = vol; + this.fileRegion = fileRegion; } /** @@ -328,8 +348,8 @@ public interface FsVolumeSpi * * @return the length of the data block */ - public long getBlockFileLength() { - return blockFileLength; + public long getBlockLength() { + return blockLength; } /** @@ -399,6 +419,10 @@ public interface FsVolumeSpi getMetaFile().getName()) : HdfsConstants.GRANDFATHER_GENERATION_STAMP; } + + public FileRegion getFileRegion() { + return fileRegion; + } } /**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java new file mode 100644 index 0000000..24921c4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * The default usage statistics for a provided volume. + */ +public class DefaultProvidedVolumeDF + implements ProvidedVolumeDF, Configurable { + + @Override + public void setConf(Configuration conf) { + } + + @Override + public Configuration getConf() { + return null; + } + + @Override + public long getCapacity() { + return Long.MAX_VALUE; + } + + @Override + public long getSpaceUsed() { + return 0; + } + + @Override + public long getBlockPoolUsed(String bpid) { + return 0; + } + + @Override + public long getAvailable() { + return Long.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d4375cd..81056db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -1742,6 +1743,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { Set<String> missingVolumesReported = new HashSet<>(); for (ReplicaInfo b : volumeMap.replicas(bpid)) { + //skip blocks in PROVIDED storage + if (b.getVolume().getStorageType() == StorageType.PROVIDED) { + continue; + } String volStorageID = b.getVolume().getStorageID(); if (!builders.containsKey(volStorageID)) { if (!missingVolumesReported.contains(volStorageID)) { @@ -1877,7 +1882,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { try (AutoCloseableLock lock = datasetLock.acquire()) { r = volumeMap.get(bpid, blockId); } - if (r != null) { if (r.blockDataExists()) { return r; @@ -2230,13 +2234,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { * @param vol Volume of the block file */ @Override - public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol) throws IOException { + public void checkAndUpdate(String bpid, ScanInfo scanInfo) + throws IOException { + + long blockId = scanInfo.getBlockId(); + File diskFile = scanInfo.getBlockFile(); + File diskMetaFile = scanInfo.getMetaFile(); + FsVolumeSpi vol = scanInfo.getVolume(); + Block corruptBlock = null; ReplicaInfo memBlockInfo; try (AutoCloseableLock lock = datasetLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); - if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { + if (memBlockInfo != null && + memBlockInfo.getState() != ReplicaState.FINALIZED) { // Block is not finalized - ignore the difference return; } @@ -2251,6 +2262,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { Block.getGenerationStamp(diskMetaFile.getName()) : HdfsConstants.GRANDFATHER_GENERATION_STAMP; + if (vol.getStorageType() == StorageType.PROVIDED) { + if (memBlockInfo == null) { + //replica exists on provided store but not in memory + ReplicaInfo diskBlockInfo = + new ReplicaBuilder(ReplicaState.FINALIZED) + .setFileRegion(scanInfo.getFileRegion()) + .setFsVolume(vol) + .setConf(conf) + .build(); + + volumeMap.add(bpid, diskBlockInfo); + LOG.warn("Added missing block to memory " + diskBlockInfo); + } else { + //replica exists in memory but not in the provided store + volumeMap.remove(bpid, blockId); + LOG.warn("Deleting missing provided block " + memBlockInfo); + } + return; + } + if (!diskFileExists) { if (memBlockInfo == null) { // Block file does not exist and block does not exist in memory @@ -3026,7 +3057,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { newReplicaInfo = replicaState.getLazyPersistVolume().activateSavedReplica(bpid, replicaInfo, replicaState); - // Update the volumeMap entry. volumeMap.add(bpid, newReplicaInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 32759c4..9f115a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileDescriptor; import java.io.FileInputStream; @@ -32,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; /** Utility methods. */ @InterfaceAudience.Private @@ -44,6 +48,22 @@ public class FsDatasetUtil { return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX); } + public static byte[] createNullChecksumByteArray() { + DataChecksum csum = + DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(out); + try { + BlockMetadataHeader.writeHeader(dataOut, csum); + dataOut.close(); + } catch (IOException e) { + FsVolumeImpl.LOG.error( + "Exception in creating null checksum stream: " + e); + return null; + } + return out.toByteArray(); + } + static File getOrigFile(File unlinkTmpFile) { final String name = unlinkTmpFile.getName(); if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) { @@ -135,8 +155,9 @@ public class FsDatasetUtil { * Compute the checksum for a block file that does not already have * its checksum computed, and save it to dstMeta file. */ - public static void computeChecksum(File srcMeta, File dstMeta, File blockFile, - int smallBufferSize, Configuration conf) throws IOException { + public static void computeChecksum(File srcMeta, File dstMeta, + File blockFile, int smallBufferSize, Configuration conf) + throws IOException { Preconditions.checkNotNull(srcMeta); Preconditions.checkNotNull(dstMeta); Preconditions.checkNotNull(blockFile); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 7224e69..319bc0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -154,18 +154,24 @@ public class FsVolumeImpl implements FsVolumeSpi { this.reservedForReplicas = new AtomicLong(0L); this.storageLocation = sd.getStorageLocation(); this.currentDir = sd.getCurrentDir(); - File parent = currentDir.getParentFile(); - this.usage = new DF(parent, conf); this.storageType = storageLocation.getStorageType(); this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY + "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT)); this.configuredCapacity = -1; + if (currentDir != null) { + File parent = currentDir.getParentFile(); + this.usage = new DF(parent, conf); + cacheExecutor = initializeCacheExecutor(parent); + this.metrics = DataNodeVolumeMetrics.create(conf, parent.getPath()); + } else { + this.usage = null; + cacheExecutor = null; + this.metrics = null; + } this.conf = conf; this.fileIoProvider = fileIoProvider; - cacheExecutor = initializeCacheExecutor(parent); - this.metrics = DataNodeVolumeMetrics.create(conf, getBaseURI().getPath()); } protected ThreadPoolExecutor initializeCacheExecutor(File parent) { @@ -440,7 +446,8 @@ public class FsVolumeImpl implements FsVolumeSpi { /** * Unplanned Non-DFS usage, i.e. Extra usage beyond reserved. * - * @return + * @return Disk usage excluding space used by HDFS and excluding space + * reserved for blocks open for write. * @throws IOException */ public long getNonDfsUsed() throws IOException { @@ -518,7 +525,7 @@ public class FsVolumeImpl implements FsVolumeSpi { public String[] getBlockPoolList() { return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); } - + /** * Temporary files. They get moved to the finalized block directory when * the block is finalized. http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java index 427f81b..2da9170 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; @@ -67,6 +68,11 @@ public class FsVolumeImplBuilder { } FsVolumeImpl build() throws IOException { + if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) { + return new ProvidedVolumeImpl(dataset, storageID, sd, + fileIoProvider != null ? fileIoProvider : + new FileIoProvider(null, null), conf); + } return new FsVolumeImpl( dataset, storageID, sd, fileIoProvider != null ? fileIoProvider : http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java new file mode 100644 index 0000000..4d28883 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +/** + * This interface is used to define the usage statistics + * of the provided storage. + */ +public interface ProvidedVolumeDF { + + long getCapacity(); + + long getSpaceUsed(); + + long getBlockPoolUsed(String bpid); + + long getAvailable(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java new file mode 100644 index 0000000..a48e117 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -0,0 +1,526 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.common.FileRegionProvider; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; +import org.apache.hadoop.util.Timer; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.AutoCloseableLock; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.map.ObjectWriter; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; + +/** + * This class is used to create provided volumes. + */ +public class ProvidedVolumeImpl extends FsVolumeImpl { + + static class ProvidedBlockPoolSlice { + private FsVolumeImpl providedVolume; + + private FileRegionProvider provider; + private Configuration conf; + private String bpid; + private ReplicaMap bpVolumeMap; + + ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume, + Configuration conf) { + this.providedVolume = volume; + bpVolumeMap = new ReplicaMap(new AutoCloseableLock()); + Class<? extends FileRegionProvider> fmt = + conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS, + TextFileRegionProvider.class, FileRegionProvider.class); + provider = ReflectionUtils.newInstance(fmt, conf); + this.conf = conf; + this.bpid = bpid; + bpVolumeMap.initBlockPool(bpid); + LOG.info("Created provider: " + provider.getClass()); + } + + FileRegionProvider getFileRegionProvider() { + return provider; + } + + public void getVolumeMap(ReplicaMap volumeMap, + RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { + Iterator<FileRegion> iter = provider.iterator(); + while(iter.hasNext()) { + FileRegion region = iter.next(); + if (region.getBlockPoolId() != null && + region.getBlockPoolId().equals(bpid)) { + ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) + .setBlockId(region.getBlock().getBlockId()) + .setURI(region.getPath().toUri()) + .setOffset(region.getOffset()) + .setLength(region.getBlock().getNumBytes()) + .setGenerationStamp(region.getBlock().getGenerationStamp()) + .setFsVolume(providedVolume) + .setConf(conf).build(); + + ReplicaInfo oldReplica = + volumeMap.get(bpid, newReplica.getBlockId()); + if (oldReplica == null) { + volumeMap.add(bpid, newReplica); + bpVolumeMap.add(bpid, newReplica); + } else { + throw new IOException( + "A block with id " + newReplica.getBlockId() + + " already exists in the volumeMap"); + } + } + } + } + + public boolean isEmpty() { + return bpVolumeMap.replicas(bpid).size() == 0; + } + + public void shutdown(BlockListAsLongs blocksListsAsLongs) { + //nothing to do! + } + + public void compileReport(LinkedList<ScanInfo> report, + ReportCompiler reportCompiler) + throws IOException, InterruptedException { + /* refresh the provider and return the list of blocks found. + * the assumption here is that the block ids in the external + * block map, after the refresh, are consistent with those + * from before the refresh, i.e., for blocks which did not change, + * the ids remain the same. + */ + provider.refresh(); + Iterator<FileRegion> iter = provider.iterator(); + while(iter.hasNext()) { + reportCompiler.throttle(); + FileRegion region = iter.next(); + if (region.getBlockPoolId().equals(bpid)) { + LOG.info("Adding ScanInfo for blkid " + + region.getBlock().getBlockId()); + report.add(new ScanInfo(region.getBlock().getBlockId(), null, null, + providedVolume, region, region.getLength())); + } + } + } + } + + private URI baseURI; + private final Map<String, ProvidedBlockPoolSlice> bpSlices = + new ConcurrentHashMap<String, ProvidedBlockPoolSlice>(); + + private ProvidedVolumeDF df; + + ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID, + StorageDirectory sd, FileIoProvider fileIoProvider, + Configuration conf) throws IOException { + super(dataset, storageID, sd, fileIoProvider, conf); + assert getStorageLocation().getStorageType() == StorageType.PROVIDED: + "Only provided storages must use ProvidedVolume"; + + baseURI = getStorageLocation().getUri(); + Class<? extends ProvidedVolumeDF> dfClass = + conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS, + DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class); + df = ReflectionUtils.newInstance(dfClass, conf); + } + + @Override + public String[] getBlockPoolList() { + return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); + } + + @Override + public long getCapacity() { + if (configuredCapacity < 0) { + return df.getCapacity(); + } + return configuredCapacity; + } + + @Override + public long getDfsUsed() throws IOException { + return df.getSpaceUsed(); + } + + @Override + long getBlockPoolUsed(String bpid) throws IOException { + return df.getBlockPoolUsed(bpid); + } + + @Override + public long getAvailable() throws IOException { + return df.getAvailable(); + } + + @Override + long getActualNonDfsUsed() throws IOException { + return df.getSpaceUsed(); + } + + @Override + public long getNonDfsUsed() throws IOException { + return 0L; + } + + @Override + public URI getBaseURI() { + return baseURI; + } + + @Override + public File getFinalizedDir(String bpid) throws IOException { + return null; + } + + @Override + public void reserveSpaceForReplica(long bytesToReserve) { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + private static final ObjectWriter WRITER = + new ObjectMapper().writerWithDefaultPrettyPrinter(); + private static final ObjectReader READER = + new ObjectMapper().reader(ProvidedBlockIteratorState.class); + + private static class ProvidedBlockIteratorState { + ProvidedBlockIteratorState() { + iterStartMs = Time.now(); + lastSavedMs = iterStartMs; + atEnd = false; + lastBlockId = -1; + } + + // The wall-clock ms since the epoch at which this iterator was last saved. + @JsonProperty + private long lastSavedMs; + + // The wall-clock ms since the epoch at which this iterator was created. + @JsonProperty + private long iterStartMs; + + @JsonProperty + private boolean atEnd; + + //The id of the last block read when the state of the iterator is saved. + //This implementation assumes that provided blocks are returned + //in sorted order of the block ids. + @JsonProperty + private long lastBlockId; + } + + private class ProviderBlockIteratorImpl + implements FsVolumeSpi.BlockIterator { + + private String bpid; + private String name; + private FileRegionProvider provider; + private Iterator<FileRegion> blockIterator; + private ProvidedBlockIteratorState state; + + ProviderBlockIteratorImpl(String bpid, String name, + FileRegionProvider provider) { + this.bpid = bpid; + this.name = name; + this.provider = provider; + rewind(); + } + + @Override + public void close() throws IOException { + //No action needed + } + + @Override + public ExtendedBlock nextBlock() throws IOException { + if (null == blockIterator || !blockIterator.hasNext()) { + return null; + } + FileRegion nextRegion = null; + while (null == nextRegion && blockIterator.hasNext()) { + FileRegion temp = blockIterator.next(); + if (temp.getBlock().getBlockId() < state.lastBlockId) { + continue; + } + if (temp.getBlockPoolId().equals(bpid)) { + nextRegion = temp; + } + } + if (null == nextRegion) { + return null; + } + state.lastBlockId = nextRegion.getBlock().getBlockId(); + return new ExtendedBlock(bpid, nextRegion.getBlock()); + } + + @Override + public boolean atEnd() { + return blockIterator != null ? !blockIterator.hasNext(): true; + } + + @Override + public void rewind() { + blockIterator = provider.iterator(); + state = new ProvidedBlockIteratorState(); + } + + @Override + public void save() throws IOException { + //We do not persist the state of this iterator anywhere, locally. + //We just re-scan provided volumes as necessary. + state.lastSavedMs = Time.now(); + } + + @Override + public void setMaxStalenessMs(long maxStalenessMs) { + //do not use max staleness + } + + @Override + public long getIterStartMs() { + return state.iterStartMs; + } + + @Override + public long getLastSavedMs() { + return state.lastSavedMs; + } + + @Override + public String getBlockPoolId() { + return bpid; + } + + public void load() throws IOException { + //on load, we just rewind the iterator for provided volumes. + rewind(); + LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(), + bpid, name, WRITER.writeValueAsString(state)); + } + } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + return new ProviderBlockIteratorImpl(bpid, name, + bpSlices.get(bpid).getFileRegionProvider()); + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name, + bpSlices.get(bpid).getFileRegionProvider()); + iter.load(); + return iter; + } + + @Override + ReplicaInfo addFinalizedBlock(String bpid, Block b, + ReplicaInfo replicaInfo, long bytesReserved) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public VolumeCheckResult check(VolumeCheckContext ignored) + throws DiskErrorException { + return VolumeCheckResult.HEALTHY; + } + + @Override + void getVolumeMap(ReplicaMap volumeMap, + final RamDiskReplicaTracker ramDiskReplicaMap) + throws IOException { + LOG.info("Creating volumemap for provided volume " + this); + for(ProvidedBlockPoolSlice s : bpSlices.values()) { + s.getVolumeMap(volumeMap, ramDiskReplicaMap); + } + } + + private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid) + throws IOException { + ProvidedBlockPoolSlice bp = bpSlices.get(bpid); + if (bp == null) { + throw new IOException("block pool " + bpid + " is not found"); + } + return bp; + } + + @Override + void getVolumeMap(String bpid, ReplicaMap volumeMap, + final RamDiskReplicaTracker ramDiskReplicaMap) + throws IOException { + getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap); + } + + @VisibleForTesting + FileRegionProvider getFileRegionProvider(String bpid) throws IOException { + return getProvidedBlockPoolSlice(bpid).getFileRegionProvider(); + } + + @Override + public String toString() { + return this.baseURI.toString(); + } + + @Override + void addBlockPool(String bpid, Configuration conf) throws IOException { + addBlockPool(bpid, conf, null); + } + + @Override + void addBlockPool(String bpid, Configuration conf, Timer timer) + throws IOException { + LOG.info("Adding block pool " + bpid + + " to volume with id " + getStorageID()); + ProvidedBlockPoolSlice bp; + bp = new ProvidedBlockPoolSlice(bpid, this, conf); + bpSlices.put(bpid, bp); + } + + void shutdown() { + if (cacheExecutor != null) { + cacheExecutor.shutdown(); + } + Set<Entry<String, ProvidedBlockPoolSlice>> set = bpSlices.entrySet(); + for (Entry<String, ProvidedBlockPoolSlice> entry : set) { + entry.getValue().shutdown(null); + } + } + + @Override + void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) { + ProvidedBlockPoolSlice bp = bpSlices.get(bpid); + if (bp != null) { + bp.shutdown(blocksListsAsLongs); + } + bpSlices.remove(bpid); + } + + @Override + boolean isBPDirEmpty(String bpid) throws IOException { + return getProvidedBlockPoolSlice(bpid).isEmpty(); + } + + @Override + void deleteBPDirectories(String bpid, boolean force) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public LinkedList<ScanInfo> compileReport(String bpid, + LinkedList<ScanInfo> report, ReportCompiler reportCompiler) + throws InterruptedException, IOException { + LOG.info("Compiling report for volume: " + this + " bpid " + bpid); + //get the report from the appropriate block pool. + if(bpSlices.containsKey(bpid)) { + bpSlices.get(bpid).compileReport(report, reportCompiler); + } + return report; + } + + @Override + public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, + long newGS, long estimateBlockLen) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b, + ReplicaInfo temp) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public ReplicaInPipeline createTemporary(ExtendedBlock b) + throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur, + String bpid, long newBlockId, long recoveryId, long newlength) + throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block, + ReplicaInfo replicaInfo, int smallBufferSize, + Configuration conf) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } + + @Override + public File[] copyBlockToLazyPersistLocation(String bpId, long blockId, + long genStamp, ReplicaInfo replicaInfo, int smallBufferSize, + Configuration conf) throws IOException { + throw new UnsupportedOperationException( + "ProvidedVolume does not yet support writes"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 8b89378..c5d14d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -686,7 +686,7 @@ public class Mover { } } - static class Cli extends Configured implements Tool { + public static class Cli extends Configured implements Tool { private static final String USAGE = "Usage: hdfs mover " + "[-p <files/dirs> | -f <local file>]" + "\n\t-p <files/dirs>\ta space separated list of HDFS files/dirs to migrate." http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java index 872ee74..45e001d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java @@ -39,7 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; */ @InterfaceAudience.Private @InterfaceStability.Evolving -class FSImageCompression { +public class FSImageCompression { /** Codec to use to save or load image, or null if the image is not compressed */ private CompressionCodec imageCodec; http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 63d1a28..4aae7d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -658,6 +658,10 @@ public class NNStorage extends Storage implements Closeable, void readProperties(StorageDirectory sd, StartupOption startupOption) throws IOException { Properties props = readPropertiesFile(sd.getVersionFile()); + if (props == null) { + throw new IOException( + "Properties not found for storage directory " + sd); + } if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK .matches(startupOption)) { int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion")); @@ -975,7 +979,11 @@ public class NNStorage extends Storage implements Closeable, StorageDirectory sd = sdit.next(); try { Properties props = readPropertiesFile(sd.getVersionFile()); - cid = props.getProperty("clusterID"); + if (props == null) { + cid = null; + } else { + cid = props.getProperty("clusterID"); + } LOG.info("current cluster id for sd="+sd.getCurrentDir() + ";lv=" + layoutVersion + ";cid=" + cid); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index dedf987..169dfc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4622,6 +4622,84 @@ </property> <property> + <name>dfs.provider.class</name> + <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value> + <description> + The class that is used to load information about blocks stored in + provided storages. + org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider + is used as the default, which expects the blocks to be specified + using a delimited text file. + </description> + </property> + + <property> + <name>dfs.provided.df.class</name> + <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value> + <description> + The class that is used to measure usage statistics of provided stores. + </description> + </property> + + <property> + <name>dfs.provided.storage.id</name> + <value>DS-PROVIDED</value> + <description> + The storage ID used for provided stores. + </description> + </property> + + <property> + <name>dfs.provided.blockformat.class</name> + <value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value> + <description> + The class that is used to specify the input format of the blocks on + provided storages. The default is + org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses + file regions to describe blocks. The file regions are specified as a + delimited text file. Each file region is a 6-tuple containing the + block id, remote file path, offset into file, length of block, the + block pool id containing the block, and the generation stamp of the + block. + </description> + </property> + + <property> + <name>dfs.provided.textprovider.delimiter</name> + <value>,</value> + <description> + The delimiter used when the provided block map is specified as + a text file. + </description> + </property> + + <property> + <name>dfs.provided.textprovider.read.path</name> + <value></value> + <description> + The path specifying the provided block map as a text file, specified as + a URI. + </description> + </property> + + <property> + <name>dfs.provided.textprovider.read.codec</name> + <value></value> + <description> + The codec used to de-compress the provided block map. + </description> + </property> + + <property> + <name>dfs.provided.textprovider.write.path</name> + <value></value> + <description> + The path to which the provided block map should be written as a text + file, specified as a URI. + </description> + </property> + + <property> <name>dfs.lock.suppress.warning.interval</name> <value>10s</value> <description>Instrumentation reporting long critical sections will suppress http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java index 25eb5b6..8bc8b0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java @@ -208,7 +208,7 @@ public class TestDFSRollback { UpgradeUtilities.createDataNodeVersionFile( dataCurrentDirs, storageInfo, - UpgradeUtilities.getCurrentBlockPoolID(cluster)); + UpgradeUtilities.getCurrentBlockPoolID(cluster), conf); cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null); assertTrue(cluster.isDataNodeUp()); @@ -256,7 +256,7 @@ public class TestDFSRollback { NodeType.DATA_NODE); UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, - UpgradeUtilities.getCurrentBlockPoolID(cluster)); + UpgradeUtilities.getCurrentBlockPoolID(cluster), conf); startBlockPoolShouldFail(StartupOption.ROLLBACK, cluster.getNamesystem().getBlockPoolId()); @@ -283,7 +283,7 @@ public class TestDFSRollback { NodeType.DATA_NODE); UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, - UpgradeUtilities.getCurrentBlockPoolID(cluster)); + UpgradeUtilities.getCurrentBlockPoolID(cluster), conf); startBlockPoolShouldFail(StartupOption.ROLLBACK, cluster.getNamesystem().getBlockPoolId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java index d202223..0c09eda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java @@ -265,7 +265,7 @@ public class TestDFSStartupVersions { conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current"); log("DataNode version info", DATA_NODE, i, versions[i]); UpgradeUtilities.createDataNodeVersionFile(storage, - versions[i].storageInfo, bpid, versions[i].blockPoolId); + versions[i].storageInfo, bpid, versions[i].blockPoolId, conf); try { cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null); } catch (Exception ignore) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java index fe1ede0..0d9f502 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java @@ -290,7 +290,7 @@ public class TestDFSUpgrade { UpgradeUtilities.getCurrentFsscTime(cluster), NodeType.DATA_NODE); UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, - UpgradeUtilities.getCurrentBlockPoolID(cluster)); + UpgradeUtilities.getCurrentBlockPoolID(cluster), conf); startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities .getCurrentBlockPoolID(null)); @@ -308,7 +308,7 @@ public class TestDFSUpgrade { NodeType.DATA_NODE); UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, - UpgradeUtilities.getCurrentBlockPoolID(cluster)); + UpgradeUtilities.getCurrentBlockPoolID(cluster), conf); // Ensure corresponding block pool failed to initialized startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities .getCurrentBlockPoolID(null)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java index 9f4df70..621bd51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java @@ -384,8 +384,10 @@ public class UpgradeUtilities { new File(datanodeStorage.toString())); sd.setStorageUuid(DatanodeStorage.generateUuid()); Properties properties = Storage.readPropertiesFile(sd.getVersionFile()); - properties.setProperty("storageID", sd.getStorageUuid()); - Storage.writeProperties(sd.getVersionFile(), properties); + if (properties != null) { + properties.setProperty("storageID", sd.getStorageUuid()); + Storage.writeProperties(sd.getVersionFile(), properties); + } retVal[i] = newDir; } @@ -461,8 +463,9 @@ public class UpgradeUtilities { * @param bpid Block pool Id */ public static void createDataNodeVersionFile(File[] parent, - StorageInfo version, String bpid) throws IOException { - createDataNodeVersionFile(parent, version, bpid, bpid); + StorageInfo version, String bpid, Configuration conf) + throws IOException { + createDataNodeVersionFile(parent, version, bpid, bpid, conf); } /** @@ -477,7 +480,8 @@ public class UpgradeUtilities { * @param bpidToWrite Block pool Id to write into the version file */ public static void createDataNodeVersionFile(File[] parent, - StorageInfo version, String bpid, String bpidToWrite) throws IOException { + StorageInfo version, String bpid, String bpidToWrite, Configuration conf) + throws IOException { DataStorage storage = new DataStorage(version); storage.setDatanodeUuid("FixedDatanodeUuid"); @@ -485,7 +489,7 @@ public class UpgradeUtilities { for (int i = 0; i < parent.length; i++) { File versionFile = new File(parent[i], "VERSION"); StorageDirectory sd = new StorageDirectory(parent[i].getParentFile()); - DataStorage.createStorageID(sd, false); + DataStorage.createStorageID(sd, false, conf); storage.writeProperties(versionFile, sd); versionFiles[i] = versionFile; File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]); http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java new file mode 100644 index 0000000..eaaac22 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.compress.CompressionCodec; + +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test for the text based block format for provided block maps. + */ +public class TestTextBlockFormat { + + static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt"); + + void check(TextWriter.Options opts, final Path vp, + final Class<? extends CompressionCodec> vc) throws IOException { + TextFileRegionFormat mFmt = new TextFileRegionFormat() { + @Override + public TextWriter createWriter(Path file, CompressionCodec codec, + String delim, Configuration conf) throws IOException { + assertEquals(vp, file); + if (null == vc) { + assertNull(codec); + } else { + assertEquals(vc, codec.getClass()); + } + return null; // ignored + } + }; + mFmt.getWriter(opts); + } + + @Test + public void testWriterOptions() throws Exception { + TextWriter.Options opts = TextWriter.defaults(); + assertTrue(opts instanceof WriterOptions); + WriterOptions wopts = (WriterOptions) opts; + Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT); + assertEquals(def, wopts.getFile()); + assertNull(wopts.getCodec()); + + opts.filename(OUTFILE); + check(opts, OUTFILE, null); + + opts.filename(OUTFILE); + opts.codec("gzip"); + Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz"); + check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class); + + } + + @Test + public void testCSVReadWrite() throws Exception { + final DataOutputBuffer out = new DataOutputBuffer(); + FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024); + FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024); + FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512); + try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) { + csv.store(r1); + csv.store(r2); + csv.store(r3); + } + Iterator<FileRegion> i3; + try (TextReader csv = new TextReader(null, null, null, ",") { + @Override + public InputStream createStream() { + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), 0, out.getLength()); + return in; + }}) { + Iterator<FileRegion> i1 = csv.iterator(); + assertEquals(r1, i1.next()); + Iterator<FileRegion> i2 = csv.iterator(); + assertEquals(r1, i2.next()); + assertEquals(r2, i2.next()); + assertEquals(r3, i2.next()); + assertEquals(r2, i1.next()); + assertEquals(r3, i1.next()); + + assertFalse(i1.hasNext()); + assertFalse(i2.hasNext()); + i3 = csv.iterator(); + } + try { + i3.next(); + } catch (IllegalStateException e) { + return; + } + fail("Invalid iterator"); + } + + @Test + public void testCSVReadWriteTsv() throws Exception { + final DataOutputBuffer out = new DataOutputBuffer(); + FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024); + FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024); + FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512); + try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) { + csv.store(r1); + csv.store(r2); + csv.store(r3); + } + Iterator<FileRegion> i3; + try (TextReader csv = new TextReader(null, null, null, "\t") { + @Override + public InputStream createStream() { + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), 0, out.getLength()); + return in; + }}) { + Iterator<FileRegion> i1 = csv.iterator(); + assertEquals(r1, i1.next()); + Iterator<FileRegion> i2 = csv.iterator(); + assertEquals(r1, i2.next()); + assertEquals(r2, i2.next()); + assertEquals(r3, i2.next()); + assertEquals(r2, i1.next()); + assertEquals(r3, i1.next()); + + assertFalse(i1.hasNext()); + assertFalse(i2.hasNext()); + i3 = csv.iterator(); + } + try { + i3.next(); + } catch (IllegalStateException e) { + return; + } + fail("Invalid iterator"); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 212f953..c31df4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -616,7 +617,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { this.datanode = datanode; if (storage != null) { for (int i = 0; i < storage.getNumStorageDirs(); ++i) { - DataStorage.createStorageID(storage.getStorageDir(i), false); + DataStorage.createStorageID(storage.getStorageDir(i), false, conf); } this.datanodeUuid = storage.getDatanodeUuid(); } else { @@ -1352,8 +1353,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override - public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol) throws IOException { + public void checkAndUpdate(String bpid, ScanInfo info) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 13502d9..bfdaad9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.*; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -94,8 +95,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { } @Override - public void checkAndUpdate(String bpid, long blockId, File diskFile, - File diskMetaFile, FsVolumeSpi vol) { + public void checkAndUpdate(String bpid, ScanInfo info) { + return; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index a30329c..cfae1e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -119,11 +119,12 @@ public class TestFsDatasetImpl { private final static String BLOCKPOOL = "BP-TEST"; - private static Storage.StorageDirectory createStorageDirectory(File root) + private static Storage.StorageDirectory createStorageDirectory(File root, + Configuration conf) throws SecurityException, IOException { Storage.StorageDirectory sd = new Storage.StorageDirectory( StorageLocation.parse(root.toURI().toString())); - DataStorage.createStorageID(sd, false); + DataStorage.createStorageID(sd, false, conf); return sd; } @@ -137,7 +138,7 @@ public class TestFsDatasetImpl { File loc = new File(BASE_DIR + "/data" + i); dirStrings.add(new Path(loc.toString()).toUri().toString()); loc.mkdirs(); - dirs.add(createStorageDirectory(loc)); + dirs.add(createStorageDirectory(loc, conf)); when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); } @@ -197,7 +198,8 @@ public class TestFsDatasetImpl { String pathUri = new Path(path).toUri().toString(); expectedVolumes.add(new File(pathUri).getAbsolutePath()); StorageLocation loc = StorageLocation.parse(pathUri); - Storage.StorageDirectory sd = createStorageDirectory(new File(path)); + Storage.StorageDirectory sd = createStorageDirectory( + new File(path), conf); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc), @@ -315,7 +317,8 @@ public class TestFsDatasetImpl { String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater"; StorageLocation loc = StorageLocation.parse(newVolumePath); - Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath)); + Storage.StorageDirectory sd = createStorageDirectory( + new File(newVolumePath), conf); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), eq(loc), @@ -348,7 +351,7 @@ public class TestFsDatasetImpl { any(ReplicaMap.class), any(RamDiskReplicaLruTracker.class)); - Storage.StorageDirectory sd = createStorageDirectory(badDir); + Storage.StorageDirectory sd = createStorageDirectory(badDir, conf); sd.lock(); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when(storage.prepareVolume(eq(datanode), @@ -492,7 +495,7 @@ public class TestFsDatasetImpl { String path = BASE_DIR + "/newData0"; String pathUri = new Path(path).toUri().toString(); StorageLocation loc = StorageLocation.parse(pathUri); - Storage.StorageDirectory sd = createStorageDirectory(new File(path)); + Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when( http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java new file mode 100644 index 0000000..2c119fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.common.FileRegionProvider; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.BlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DNConf; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; +import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.util.StringUtils; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Basic test cases for provided implementation. + */ +public class TestProvidedImpl { + private static final Logger LOG = + LoggerFactory.getLogger(TestFsDatasetImpl.class); + private static final String BASE_DIR = + new FileSystemTestHelper().getTestRootDir(); + private static final int NUM_LOCAL_INIT_VOLUMES = 1; + private static final int NUM_PROVIDED_INIT_VOLUMES = 1; + private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"}; + private static final int NUM_PROVIDED_BLKS = 10; + private static final long BLK_LEN = 128 * 1024; + private static final int MIN_BLK_ID = 0; + private static final int CHOSEN_BP_ID = 0; + + private static String providedBasePath = BASE_DIR; + + private Configuration conf; + private DataNode datanode; + private DataStorage storage; + private FsDatasetImpl dataset; + private static Map<Long, String> blkToPathMap; + private static List<FsVolumeImpl> providedVolumes; + + /** + * A simple FileRegion iterator for tests. + */ + public static class TestFileRegionIterator implements Iterator<FileRegion> { + + private int numBlocks; + private int currentCount; + private String basePath; + + public TestFileRegionIterator(String basePath, int minID, int numBlocks) { + this.currentCount = minID; + this.numBlocks = numBlocks; + this.basePath = basePath; + } + + @Override + public boolean hasNext() { + return currentCount < numBlocks; + } + + @Override + public FileRegion next() { + FileRegion region = null; + if (hasNext()) { + File newFile = new File(basePath, "file" + currentCount); + if(!newFile.exists()) { + try { + LOG.info("Creating file for blkid " + currentCount); + blkToPathMap.put((long) currentCount, newFile.getAbsolutePath()); + LOG.info("Block id " + currentCount + " corresponds to file " + + newFile.getAbsolutePath()); + newFile.createNewFile(); + Writer writer = new OutputStreamWriter( + new FileOutputStream(newFile.getAbsolutePath()), "utf-8"); + for(int i=0; i< BLK_LEN/(Integer.SIZE/8); i++) { + writer.write(currentCount); + } + writer.flush(); + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + region = new FileRegion(currentCount, new Path(newFile.toString()), + 0, BLK_LEN, BLOCK_POOL_IDS[CHOSEN_BP_ID]); + currentCount++; + } + return region; + } + + @Override + public void remove() { + //do nothing. + } + + public void resetMinBlockId(int minId) { + currentCount = minId; + } + + public void resetBlockCount(int numBlocks) { + this.numBlocks = numBlocks; + } + + } + + /** + * A simple FileRegion provider for tests. + */ + public static class TestFileRegionProvider + extends FileRegionProvider implements Configurable { + + private Configuration conf; + private int minId; + private int numBlocks; + + TestFileRegionProvider() { + minId = MIN_BLK_ID; + numBlocks = NUM_PROVIDED_BLKS; + } + + @Override + public Iterator<FileRegion> iterator() { + return new TestFileRegionIterator(providedBasePath, minId, numBlocks); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void refresh() { + //do nothing! + } + + public void setMinBlkId(int minId) { + this.minId = minId; + } + + public void setBlockCount(int numBlocks) { + this.numBlocks = numBlocks; + } + } + + private static Storage.StorageDirectory createLocalStorageDirectory( + File root, Configuration conf) + throws SecurityException, IOException { + Storage.StorageDirectory sd = + new Storage.StorageDirectory( + StorageLocation.parse(root.toURI().toString())); + DataStorage.createStorageID(sd, false, conf); + return sd; + } + + private static Storage.StorageDirectory createProvidedStorageDirectory( + String confString, Configuration conf) + throws SecurityException, IOException { + Storage.StorageDirectory sd = + new Storage.StorageDirectory(StorageLocation.parse(confString)); + DataStorage.createStorageID(sd, false, conf); + return sd; + } + + private static void createStorageDirs(DataStorage storage, + Configuration conf, int numDirs, int numProvidedDirs) + throws IOException { + List<Storage.StorageDirectory> dirs = + new ArrayList<Storage.StorageDirectory>(); + List<String> dirStrings = new ArrayList<String>(); + FileUtils.deleteDirectory(new File(BASE_DIR)); + for (int i = 0; i < numDirs; i++) { + File loc = new File(BASE_DIR, "data" + i); + dirStrings.add(new Path(loc.toString()).toUri().toString()); + loc.mkdirs(); + dirs.add(createLocalStorageDirectory(loc, conf)); + when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); + } + + for (int i = numDirs; i < numDirs + numProvidedDirs; i++) { + File loc = new File(BASE_DIR, "data" + i); + providedBasePath = loc.getAbsolutePath(); + loc.mkdirs(); + String dirString = "[PROVIDED]" + + new Path(loc.toString()).toUri().toString(); + dirStrings.add(dirString); + dirs.add(createProvidedStorageDirectory(dirString, conf)); + when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); + } + + String dataDir = StringUtils.join(",", dirStrings); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir); + when(storage.dirIterator()).thenReturn(dirs.iterator()); + when(storage.getNumStorageDirs()).thenReturn(numDirs + numProvidedDirs); + } + + private int getNumVolumes() { + try (FsDatasetSpi.FsVolumeReferences volumes = + dataset.getFsVolumeReferences()) { + return volumes.size(); + } catch (IOException e) { + return 0; + } + } + + private void compareBlkFile(InputStream ins, String filepath) + throws FileNotFoundException, IOException { + try (ReadableByteChannel i = Channels.newChannel( + new FileInputStream(new File(filepath)))) { + try (ReadableByteChannel j = Channels.newChannel(ins)) { + ByteBuffer ib = ByteBuffer.allocate(4096); + ByteBuffer jb = ByteBuffer.allocate(4096); + while (true) { + int il = i.read(ib); + int jl = j.read(jb); + if (il < 0 || jl < 0) { + assertEquals(il, jl); + break; + } + ib.flip(); + jb.flip(); + int cmp = Math.min(ib.remaining(), jb.remaining()); + for (int k = 0; k < cmp; ++k) { + assertEquals(ib.get(), jb.get()); + } + ib.compact(); + jb.compact(); + } + } + } + } + + @Before + public void setUp() throws IOException { + datanode = mock(DataNode.class); + storage = mock(DataStorage.class); + this.conf = new Configuration(); + this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); + + when(datanode.getConf()).thenReturn(conf); + final DNConf dnConf = new DNConf(datanode); + when(datanode.getDnConf()).thenReturn(dnConf); + + final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf); + when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); + final ShortCircuitRegistry shortCircuitRegistry = + new ShortCircuitRegistry(conf); + when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); + + this.conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS, + TestFileRegionProvider.class, FileRegionProvider.class); + + blkToPathMap = new HashMap<Long, String>(); + providedVolumes = new LinkedList<FsVolumeImpl>(); + + createStorageDirs( + storage, conf, NUM_LOCAL_INIT_VOLUMES, NUM_PROVIDED_INIT_VOLUMES); + + dataset = new FsDatasetImpl(datanode, storage, conf); + FsVolumeReferences volumes = dataset.getFsVolumeReferences(); + for (int i = 0; i < volumes.size(); i++) { + FsVolumeSpi vol = volumes.get(i); + if (vol.getStorageType() == StorageType.PROVIDED) { + providedVolumes.add((FsVolumeImpl) vol); + } + } + + for (String bpid : BLOCK_POOL_IDS) { + dataset.addBlockPool(bpid, conf); + } + + assertEquals(NUM_LOCAL_INIT_VOLUMES + NUM_PROVIDED_INIT_VOLUMES, + getNumVolumes()); + assertEquals(0, dataset.getNumFailedVolumes()); + } + + @Test + public void testProvidedStorageID() throws IOException { + for (int i = 0; i < providedVolumes.size(); i++) { + assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT, + providedVolumes.get(i).getStorageID()); + } + } + + @Test + public void testBlockLoad() throws IOException { + for (int i = 0; i < providedVolumes.size(); i++) { + FsVolumeImpl vol = providedVolumes.get(i); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); + vol.getVolumeMap(volumeMap, null); + + assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length); + for (int j = 0; j < BLOCK_POOL_IDS.length; j++) { + if (j != CHOSEN_BP_ID) { + //this block pool should not have any blocks + assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j])); + } + } + assertEquals(NUM_PROVIDED_BLKS, + volumeMap.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size()); + } + } + + @Test + public void testProvidedBlockRead() throws IOException { + for (int id = 0; id < NUM_PROVIDED_BLKS; id++) { + ExtendedBlock eb = new ExtendedBlock( + BLOCK_POOL_IDS[CHOSEN_BP_ID], id, BLK_LEN, + HdfsConstants.GRANDFATHER_GENERATION_STAMP); + InputStream ins = dataset.getBlockInputStream(eb, 0); + String filepath = blkToPathMap.get((long) id); + compareBlkFile(ins, filepath); + } + } + + @Test + public void testProvidedBlockIterator() throws IOException { + for (int i = 0; i < providedVolumes.size(); i++) { + FsVolumeImpl vol = providedVolumes.get(i); + BlockIterator iter = + vol.newBlockIterator(BLOCK_POOL_IDS[CHOSEN_BP_ID], "temp"); + Set<Long> blockIdsUsed = new HashSet<Long>(); + while(!iter.atEnd()) { + ExtendedBlock eb = iter.nextBlock(); + long blkId = eb.getBlockId(); + assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS); + //all block ids must be unique! + assertTrue(!blockIdsUsed.contains(blkId)); + blockIdsUsed.add(blkId); + } + assertEquals(NUM_PROVIDED_BLKS, blockIdsUsed.size()); + } + } + + + @Test + public void testRefresh() throws IOException { + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + for (int i = 0; i < providedVolumes.size(); i++) { + ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i); + TestFileRegionProvider provider = (TestFileRegionProvider) + vol.getFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID]); + //equivalent to two new blocks appearing + provider.setBlockCount(NUM_PROVIDED_BLKS + 2); + //equivalent to deleting the first block + provider.setMinBlkId(MIN_BLK_ID + 1); + + DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf); + scanner.reconcile(); + ReplicaInfo info = dataset.getBlockReplica( + BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1); + //new replica should be added to the dataset + assertTrue(info != null); + try { + info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0); + } catch(Exception ex) { + LOG.info("Exception expected: " + ex); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java index d5a3948..db8c029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java @@ -68,7 +68,10 @@ public class TestClusterId { fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE); StorageDirectory sd = sdit.next(); Properties props = Storage.readPropertiesFile(sd.getVersionFile()); - String cid = props.getProperty("clusterID"); + String cid = null; + if (props != null) { + cid = props.getProperty("clusterID"); + } LOG.info("successfully formated : sd="+sd.getCurrentDir() + ";cid="+cid); return cid; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org