Author: cmccabe Date: Mon Oct 14 22:19:10 2013 New Revision: 1532116 URL: http://svn.apache.org/r1532116 Log: HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only (cmccabe)
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Mon Oct 14 22:19:10 2013 @@ -61,6 +61,7 @@ HDFS-4949 (Unreleased) String. (cnauroth) OPTIMIZATIONS + HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) BUG FIXES HDFS-5169. hdfs.c: translateZCRException: null pointer deref when Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Oct 14 22:19:10 2013 @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; @@ -119,6 +120,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -696,6 +698,8 @@ public class PBHelper { return PBHelper.convert(proto.getKeyUpdateCmd()); case RegisterCommand: return REG_CMD; + case BlockIdCommand: + return PBHelper.convert(proto.getBlkIdCmd()); } return null; } @@ -738,12 +742,6 @@ public class PBHelper { case DatanodeProtocol.DNA_SHUTDOWN: builder.setAction(BlockCommandProto.Action.SHUTDOWN); break; - case DatanodeProtocol.DNA_CACHE: - builder.setAction(BlockCommandProto.Action.CACHE); - break; - case DatanodeProtocol.DNA_UNCACHE: - builder.setAction(BlockCommandProto.Action.UNCACHE); - break; default: throw new AssertionError("Invalid action"); } @@ -754,6 +752,26 @@ public class PBHelper { builder.addAllTargets(PBHelper.convert(cmd.getTargets())); return builder.build(); } + + public static BlockIdCommandProto convert(BlockIdCommand cmd) { + BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder() + .setBlockPoolId(cmd.getBlockPoolId()); + switch (cmd.getAction()) { + case DatanodeProtocol.DNA_CACHE: + builder.setAction(BlockIdCommandProto.Action.CACHE); + break; + case DatanodeProtocol.DNA_UNCACHE: + builder.setAction(BlockIdCommandProto.Action.UNCACHE); + break; + default: + throw new AssertionError("Invalid action"); + } + long[] blockIds = cmd.getBlockIds(); + for (int i = 0; i < blockIds.length; i++) { + builder.addBlockIds(blockIds[i]); + } + return builder.build(); + } private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) { DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length]; @@ -796,11 +814,14 @@ public class PBHelper { break; case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_INVALIDATE: + case DatanodeProtocol.DNA_SHUTDOWN: + builder.setCmdType(DatanodeCommandProto.Type.BlockCommand). + setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: - case DatanodeProtocol.DNA_SHUTDOWN: - builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd( - PBHelper.convert((BlockCommand) datanodeCommand)); + builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). + setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: @@ -851,6 +872,20 @@ public class PBHelper { case SHUTDOWN: action = DatanodeProtocol.DNA_SHUTDOWN; break; + default: + throw new AssertionError("Unknown action type: " + blkCmd.getAction()); + } + return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); + } + + public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { + int numBlockIds = blkIdCmd.getBlockIdsCount(); + long blockIds[] = new long[numBlockIds]; + for (int i = 0; i < numBlockIds; i++) { + blockIds[i] = blkIdCmd.getBlockIds(i); + } + int action = DatanodeProtocol.DNA_UNKNOWN; + switch (blkIdCmd.getAction()) { case CACHE: action = DatanodeProtocol.DNA_CACHE; break; @@ -858,9 +893,9 @@ public class PBHelper { action = DatanodeProtocol.DNA_UNCACHE; break; default: - throw new AssertionError("Unknown action type: " + blkCmd.getAction()); + throw new AssertionError("Unknown action type: " + blkIdCmd.getAction()); } - return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets); + return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds); } public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) { Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Oct 14 22:19:10 2013 @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -1308,14 +1309,22 @@ public class DatanodeManager { // Check pending caching List<Block> pendingCacheList = nodeinfo.getCacheBlocks(); if (pendingCacheList != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_CACHE, blockPoolId, - pendingCacheList.toArray(new Block[] {}))); + long blockIds[] = new long[pendingCacheList.size()]; + for (int i = 0; i < pendingCacheList.size(); i++) { + blockIds[i] = pendingCacheList.get(i).getBlockId(); + } + cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_CACHE, blockPoolId, + blockIds)); } // Check cached block invalidation blks = nodeinfo.getInvalidateCacheBlocks(); if (blks != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_UNCACHE, - blockPoolId, blks)); + long blockIds[] = new long[blks.length]; + for (int i = 0; i < blks.length; i++) { + blockIds[i] = blks[i].getBlockId(); + } + cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_UNCACHE, + blockPoolId, blockIds)); } blockManager.addKeyUpdateCommand(cmds, nodeinfo); Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Oct 14 22:19:10 2013 @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -518,6 +519,8 @@ class BPOfferService { return true; final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null; + final BlockIdCommand blockIdCmd = + cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null; switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: @@ -545,13 +548,13 @@ class BPOfferService { break; case DatanodeProtocol.DNA_CACHE: LOG.info("DatanodeCommand action: DNA_CACHE"); - dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks()); - dn.metrics.incrBlocksCached(bcmd.getBlocks().length); + dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); + dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_UNCACHE: LOG.info("DatanodeCommand action: DNA_UNCACHE"); - dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks()); - dn.metrics.incrBlocksUncached(bcmd.getBlocks().length); + dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); + dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Mon Oct 14 22:19:10 2013 @@ -305,16 +305,16 @@ public interface FsDatasetSpi<V extends /** * Caches the specified blocks * @param bpid Block pool id - * @param cacheBlks - block to cache + * @param blockIds - block ids to cache */ - public void cache(String bpid, Block[] cacheBlks); + public void cache(String bpid, long[] blockIds); /** * Uncaches the specified blocks * @param bpid Block pool id - * @param uncacheBlks - blocks to uncache + * @param blockIds - blocks ids to uncache */ - public void uncache(String bpid, Block[] uncacheBlks); + public void uncache(String bpid, long[] blockIds); /** * Check if all the data directories are healthy Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Oct 14 22:19:10 2013 @@ -91,8 +91,8 @@ public class FsDatasetCache { /** * @return if the block is cached */ - boolean isCached(String bpid, Block block) { - MappableBlock mapBlock = cachedBlocks.get(block.getBlockId()); + boolean isCached(String bpid, long blockId) { + MappableBlock mapBlock = cachedBlocks.get(blockId); if (mapBlock != null) { return mapBlock.getBlockPoolId().equals(bpid); } @@ -127,7 +127,7 @@ public class FsDatasetCache { */ void cacheBlock(String bpid, Block block, FsVolumeImpl volume, FileInputStream blockIn, FileInputStream metaIn) { - if (isCached(bpid, block)) { + if (isCached(bpid, block.getBlockId())) { return; } MappableBlock mapBlock = null; @@ -166,23 +166,23 @@ public class FsDatasetCache { /** * Uncaches a block if it is cached. - * @param block to uncache + * @param blockId id to uncache */ - void uncacheBlock(String bpid, Block block) { - MappableBlock mapBlock = cachedBlocks.get(block.getBlockId()); + void uncacheBlock(String bpid, long blockId) { + MappableBlock mapBlock = cachedBlocks.get(blockId); if (mapBlock != null && mapBlock.getBlockPoolId().equals(bpid) && - mapBlock.getBlock().equals(block)) { + mapBlock.getBlock().getBlockId() == blockId) { mapBlock.close(); - cachedBlocks.remove(block.getBlockId()); + cachedBlocks.remove(blockId); long bytes = mapBlock.getNumBytes(); long used = usedBytes.get(); while (!usedBytes.compareAndSet(used, used - bytes)) { used = usedBytes.get(); } - LOG.info("Successfully uncached block " + block); + LOG.info("Successfully uncached block " + blockId); } else { - LOG.info("Could not uncache block " + block + ": unknown block."); + LOG.info("Could not uncache block " + blockId + ": unknown block."); } } @@ -215,7 +215,8 @@ public class FsDatasetCache { // If we failed or the block became uncacheable in the meantime, // clean up and return the reserved cache allocation if (!success || - !dataset.validToCache(block.getBlockPoolId(), block.getBlock())) { + !dataset.validToCache(block.getBlockPoolId(), + block.getBlock().getBlockId())) { block.close(); long used = usedBytes.get(); while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) { Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Oct 14 22:19:10 2013 @@ -562,7 +562,7 @@ class FsDatasetImpl implements FsDataset FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { // uncache the block - cacheManager.uncacheBlock(bpid, replicaInfo); + cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); // unlink the finalized replica replicaInfo.unlinkBlock(1); @@ -1178,7 +1178,7 @@ class FsDatasetImpl implements FsDataset } // Uncache the block synchronously - cacheManager.uncacheBlock(bpid, invalidBlks[i]); + cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); // Delete the block asynchronously to make sure we can do it fast enough asyncDiskService.deleteAsync(v, f, FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), @@ -1189,20 +1189,22 @@ class FsDatasetImpl implements FsDataset } } - synchronized boolean validToCache(String bpid, Block blk) { - ReplicaInfo info = volumeMap.get(bpid, blk); + synchronized boolean validToCache(String bpid, long blockId) { + ReplicaInfo info = volumeMap.get(bpid, blockId); if (info == null) { - LOG.warn("Failed to cache replica " + blk + ": ReplicaInfo not found."); + LOG.warn("Failed to cache replica in block pool " + bpid + + " with block id " + blockId + ": ReplicaInfo not found."); return false; } FsVolumeImpl volume = (FsVolumeImpl)info.getVolume(); if (volume == null) { - LOG.warn("Failed to cache replica " + blk + ": Volume not found."); + LOG.warn("Failed to cache block with id " + blockId + + ": Volume not found."); return false; } if (info.getState() != ReplicaState.FINALIZED) { - LOG.warn("Failed to cache replica " + blk + ": Replica is not" - + " finalized."); + LOG.warn("Failed to block with id " + blockId + + ": Replica is not finalized."); return false; } return true; @@ -1211,31 +1213,33 @@ class FsDatasetImpl implements FsDataset /** * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. */ - private void cacheBlock(String bpid, Block blk) { + private void cacheBlock(String bpid, long blockId) { ReplicaInfo info; FsVolumeImpl volume; synchronized (this) { - if (!validToCache(bpid, blk)) { + if (!validToCache(bpid, blockId)) { return; } - info = volumeMap.get(bpid, blk); + info = volumeMap.get(bpid, blockId); volume = (FsVolumeImpl)info.getVolume(); } // Try to open block and meta streams FileInputStream blockIn = null; FileInputStream metaIn = null; boolean success = false; + ExtendedBlock extBlk = + new ExtendedBlock(bpid, blockId, + info.getBytesOnDisk(), info.getGenerationStamp()); try { - ExtendedBlock extBlk = new ExtendedBlock(bpid, blk); blockIn = (FileInputStream)getBlockInputStream(extBlk, 0); metaIn = (FileInputStream)getMetaDataInputStream(extBlk) .getWrappedStream(); success = true; } catch (ClassCastException e) { - LOG.warn("Failed to cache replica " + blk + ": Underlying blocks" + LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks" + " are not backed by files.", e); } catch (IOException e) { - LOG.warn("Failed to cache replica " + blk + ": IOException while" + LOG.warn("Failed to cache replica " + extBlk + ": IOException while" + " trying to open block or meta files.", e); } if (!success) { @@ -1243,21 +1247,21 @@ class FsDatasetImpl implements FsDataset IOUtils.closeQuietly(metaIn); return; } - cacheManager.cacheBlock(bpid, blk, volume, blockIn, metaIn); + cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(), + volume, blockIn, metaIn); } @Override // FsDatasetSpi - public void cache(String bpid, Block[] cacheBlks) { - for (int i=0; i<cacheBlks.length; i++) { - cacheBlock(bpid, cacheBlks[i]); + public void cache(String bpid, long[] blockIds) { + for (int i=0; i < blockIds.length; i++) { + cacheBlock(bpid, blockIds[i]); } } @Override // FsDatasetSpi - public void uncache(String bpid, Block[] uncacheBlks) { - for (int i=0; i<uncacheBlks.length; i++) { - Block blk = uncacheBlks[i]; - cacheManager.uncacheBlock(bpid, blk); + public void uncache(String bpid, long[] blockIds) { + for (int i=0; i < blockIds.length; i++) { + cacheManager.uncacheBlock(bpid, blockIds[i]); } } Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java?rev=1532116&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java Mon Oct 14 22:19:10 2013 @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/**************************************************** + * A BlockIdCommand is an instruction to a datanode + * regarding some blocks under its control. + ****************************************************/ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockIdCommand extends DatanodeCommand { + final String poolId; + final long blockIds[]; + + /** + * Create BlockCommand for the given action + * @param blocks blocks related to the action + */ + public BlockIdCommand(int action, String poolId, long[] blockIds) { + super(action); + this.poolId = poolId; + this.blockIds= blockIds; + } + + public String getBlockPoolId() { + return poolId; + } + + public long[] getBlockIds() { + return blockIds; + } +} Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Mon Oct 14 22:19:10 2013 @@ -70,6 +70,7 @@ message DatanodeCommandProto { RegisterCommand = 5; UnusedUpgradeCommand = 6; NullDatanodeCommand = 7; + BlockIdCommand = 8; } required Type cmdType = 1; // Type of the command @@ -82,6 +83,7 @@ message DatanodeCommandProto { optional FinalizeCommandProto finalizeCmd = 5; optional KeyUpdateCommandProto keyUpdateCmd = 6; optional RegisterCommandProto registerCmd = 7; + optional BlockIdCommandProto blkIdCmd = 8; } /** @@ -103,8 +105,6 @@ message BlockCommandProto { TRANSFER = 1; // Transfer blocks to another datanode INVALIDATE = 2; // Invalidate blocks SHUTDOWN = 3; // Shutdown the datanode - CACHE = 4; // Cache blocks on the datanode - UNCACHE = 5; // Uncache blocks on the datanode } required Action action = 1; required string blockPoolId = 2; @@ -113,6 +113,20 @@ message BlockCommandProto { } /** + * Command to instruct datanodes to perform certain action + * on the given set of block IDs. + */ +message BlockIdCommandProto { + enum Action { + CACHE = 1; + UNCACHE = 2; + } + required Action action = 1; + required string blockPoolId = 2; + repeated uint64 blockIds = 3 [packed=true]; +} + +/** * List of blocks to be recovered by the datanode */ message BlockRecoveryCommandProto { Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1532116&r1=1532115&r2=1532116&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Mon Oct 14 22:19:10 2013 @@ -580,13 +580,13 @@ public class SimulatedFSDataset implemen } @Override // FSDatasetSpi - public void cache(String bpid, Block[] cacheBlks) { + public void cache(String bpid, long[] cacheBlks) { throw new UnsupportedOperationException( "SimulatedFSDataset does not support cache operation!"); } @Override // FSDatasetSpi - public void uncache(String bpid, Block[] uncacheBlks) { + public void uncache(String bpid, long[] uncacheBlks) { throw new UnsupportedOperationException( "SimulatedFSDataset does not support uncache operation!"); }