HDFS-13421. [PROVIDED Phase 2] Implement DNA_BACKUP command in Datanode. Contributed by Ewan Higgs.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06477abc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06477abc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06477abc Branch: refs/heads/HDFS-12090 Commit: 06477abcd93eb988b4afd0a2dff549e67e0dbd85 Parents: 8cdd033 Author: Virajith Jalaparti <viraj...@apache.org> Authored: Wed Aug 1 12:13:31 2018 -0700 Committer: Ewan Higgs <ewan.hi...@wdc.com> Committed: Mon Sep 3 14:42:33 2018 +0200 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/BlockInputStream.java | 52 ++++++++ .../hdfs/server/datanode/BPOfferService.java | 6 + .../hadoop/hdfs/server/datanode/DataNode.java | 18 +++ .../SyncServiceSatisfierDatanodeWorker.java | 97 +++++++++++++++ .../SyncTaskExecutionFeedbackCollector.java | 54 ++++++++ .../executor/BlockSyncOperationExecutor.java | 122 +++++++++++++++++++ .../executor/BlockSyncReaderFactory.java | 92 ++++++++++++++ .../executor/BlockSyncTaskRunner.java | 69 +++++++++++ .../hadoop/hdfs/TestBlockInputStream.java | 84 +++++++++++++ .../TestBlockSyncOperationExecutor.java | 94 ++++++++++++++ 10 files changed, 688 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java new file mode 100644 index 0000000..152f83e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Facade around BlockReader that indeed implements the InputStream interface. + */ +public class BlockInputStream extends InputStream { + private final BlockReader blockReader; + + public BlockInputStream(BlockReader blockReader) { + this.blockReader = blockReader; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int c = blockReader.read(b, 0, b.length); + if (c > 0) { + return b[0]; + } else { + return -1; + } + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + return blockReader.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return blockReader.skip(n); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index a25f6a9..b8eef5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -795,6 +795,12 @@ class BPOfferService { ((BlockECReconstructionCommand) cmd).getECTasks(); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); break; + case DatanodeProtocol.DNA_BACKUP: + LOG.info("DatanodeCommand action: DNA_BACKUP"); + Collection<BlockSyncTask> backupTasks = + ((SyncCommand) cmd).getSyncTasks(); + dn.getSyncServiceSatisfierDatanodeWorker().processSyncTasks(backupTasks); + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c980395..44a548c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -387,6 +387,7 @@ public class DataNode extends ReconfigurableBase private String dnUserName = null; private BlockRecoveryWorker blockRecoveryWorker; private ErasureCodingWorker ecWorker; + private SyncServiceSatisfierDatanodeWorker syncServiceSatisfierDatanodeWorker; private final Tracer tracer; private final TracerConfigurationManager tracerConfigurationManager; private static final int NUM_CORES = Runtime.getRuntime() @@ -1435,6 +1436,9 @@ public class DataNode extends ReconfigurableBase ecWorker = new ErasureCodingWorker(getConf(), this); blockRecoveryWorker = new BlockRecoveryWorker(this); + syncServiceSatisfierDatanodeWorker = + new SyncServiceSatisfierDatanodeWorker(getConf(), this); + syncServiceSatisfierDatanodeWorker.start(); blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(getConf()); @@ -1987,6 +1991,11 @@ public class DataNode extends ReconfigurableBase } } + // stop syncServiceSatisfierDatanodeWorker + if (syncServiceSatisfierDatanodeWorker != null) { + syncServiceSatisfierDatanodeWorker.stop(); + } + List<BPOfferService> bposArray = (this.blockPoolManager == null) ? new ArrayList<BPOfferService>() : this.blockPoolManager.getAllNamenodeThreads(); @@ -2139,6 +2148,11 @@ public class DataNode extends ReconfigurableBase notifyAll(); } tracer.close(); + + // Waiting to finish backup SPS worker thread. + if (syncServiceSatisfierDatanodeWorker != null) { + syncServiceSatisfierDatanodeWorker.waitToFinishWorkerThread(); + } } /** @@ -3631,4 +3645,8 @@ public class DataNode extends ReconfigurableBase } return this.diskBalancer; } + + public SyncServiceSatisfierDatanodeWorker getSyncServiceSatisfierDatanodeWorker() { + return syncServiceSatisfierDatanodeWorker; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java new file mode 100644 index 0000000..7216e8f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.syncservice.SyncTaskExecutionFeedbackCollector; +import org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncOperationExecutor; +import org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncReaderFactory; +import org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncTaskRunner; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +public class SyncServiceSatisfierDatanodeWorker { + private static final Logger LOG = LoggerFactory + .getLogger(SyncServiceSatisfierDatanodeWorker.class); + + private ExecutorService executorService; + private BlockSyncOperationExecutor syncOperationExecutor; + private SyncTaskExecutionFeedbackCollector syncTaskExecutionFeedbackCollector; + + public SyncServiceSatisfierDatanodeWorker(Configuration conf, DataNode dataNode) throws IOException { + this.executorService = HadoopExecutors.newFixedThreadPool(4); + this.syncOperationExecutor = + BlockSyncOperationExecutor.createOnDataNode(conf, + (locatedBlock, config) -> { + try { + return BlockSyncReaderFactory.createBlockReader(dataNode, locatedBlock, config); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + ); + this.syncTaskExecutionFeedbackCollector = new SyncTaskExecutionFeedbackCollector(); + } + + + public void start() { + this.executorService = HadoopExecutors.newFixedThreadPool(4); + } + + public void stop() { + this.executorService.shutdown(); + } + + public void waitToFinishWorkerThread() { + try { + this.executorService.awaitTermination(3, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOG.warn("SyncServiceSatisfierDatanodeWorker interrupted during waiting for finalization."); + Thread.currentThread().interrupt(); + } + } + + public void processSyncTasks(Collection<BlockSyncTask> blockSyncTasks) { + + LOG.debug("Received SyncTasks: {}", blockSyncTasks); + for (BlockSyncTask blockSyncTask : blockSyncTasks) { + try { + executorService.submit(new BlockSyncTaskRunner(blockSyncTask, + syncOperationExecutor, + syncTaskExecutionFeedback -> syncTaskExecutionFeedbackCollector + .addFeedback(syncTaskExecutionFeedback))); + } catch (RejectedExecutionException e) { + LOG.warn("BlockSyncTask {} for {} was rejected: {}", + blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI(), + e.getCause()); + } + } + } + + public SyncTaskExecutionFeedbackCollector getSyncTaskExecutionFeedbackCollector() { + return syncTaskExecutionFeedbackCollector; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java new file mode 100644 index 0000000..41cd441 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.syncservice; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; + +import java.util.List; + +/** + * DatanodeSyncTaskExecutionFeedbackCollector collects feedback for the + * sync service tracker to determine what has happened and report statistics. + */ +public class SyncTaskExecutionFeedbackCollector { + + private List<BlockSyncTaskExecutionFeedback> collectedFeedback; + + public SyncTaskExecutionFeedbackCollector() { + this.collectedFeedback = Lists.newArrayList(); + } + + public void addFeedback(BlockSyncTaskExecutionFeedback feedback) { + synchronized (this) { + collectedFeedback.add(feedback); + } + } + + public BulkSyncTaskExecutionFeedback packageFeedbackForHeartbeat() { + + List<BlockSyncTaskExecutionFeedback> feedbackForHeartbeat; + + synchronized (this) { + feedbackForHeartbeat = collectedFeedback; + collectedFeedback = Lists.newArrayList(); + } + return new BulkSyncTaskExecutionFeedback(feedbackForHeartbeat); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java new file mode 100644 index 0000000..7fde230 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.syncservice.executor; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BBUploadHandle; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.MultipartUploader; +import org.apache.hadoop.fs.MultipartUploaderFactory; +import org.apache.hadoop.fs.PartHandle; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockInputStream; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask; +import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.Enumeration; +import java.util.List; +import java.util.Vector; + +/** + * BlockSyncOperationExecutor writes the blocks to the sync service remote + * endpoint. + */ +public class BlockSyncOperationExecutor { + + public static final Logger LOG = + LoggerFactory.getLogger(BlockSyncOperationExecutor.class); + + private Configuration conf; + private BiFunction<LocatedBlock, Configuration, BlockReader> + createBlockReader; + private Function<FileSystem, MultipartUploader> multipartUploaderSupplier; + + @VisibleForTesting + BlockSyncOperationExecutor(Configuration conf, + BiFunction<LocatedBlock, Configuration, BlockReader> createBlockReader, + Function<FileSystem, MultipartUploader> multipartUploaderSupplier) { + this.conf = conf; + this.createBlockReader = createBlockReader; + this.multipartUploaderSupplier = multipartUploaderSupplier; + } + + public static BlockSyncOperationExecutor createOnDataNode(Configuration conf, + BiFunction<LocatedBlock, Configuration, BlockReader> createBlockReader) { + return new BlockSyncOperationExecutor(conf, + createBlockReader, + fs -> { + try { + return MultipartUploaderFactory.get(fs, conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + public SyncTaskExecutionResult execute(BlockSyncTask blockSyncTask) + throws Exception { + LOG.info("Executing MetadataSyncTask {} (on {})", + blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI()); + + return doMultiPartPart( + blockSyncTask.getRemoteURI(), + blockSyncTask.getLocatedBlocks(), + blockSyncTask.getPartNumber(), + blockSyncTask.getUploadHandle(), + blockSyncTask.getOffset(), + blockSyncTask.getLength()); + } + + private SyncTaskExecutionResult doMultiPartPart(URI uri, + List<LocatedBlock> locatedBlocks, int partNumber, byte[] uploadHandle, + int offset, long length) throws IOException { + FileSystem fs = FileSystem.get(uri, conf); + Path filePath = new Path(uri); + Vector<InputStream> inputStreams = new Vector<>(locatedBlocks.size()); + for (int i = 0; i < locatedBlocks.size(); ++i) { + LocatedBlock locatedBlock = locatedBlocks.get(i); + BlockReader reader = createBlockReader.apply(locatedBlock, conf); + if (i == 0) { + reader.skip(offset); + } + BlockInputStream inputStream = new BlockInputStream(reader); + inputStreams.add(inputStream); + } + Enumeration<InputStream> streamEnumeration = inputStreams.elements(); + SequenceInputStream inputStream = + new SequenceInputStream(streamEnumeration); + MultipartUploader mpu = multipartUploaderSupplier.apply(fs); + PartHandle partHandle = mpu.putPart(filePath, inputStream, + partNumber, BBUploadHandle.from(ByteBuffer.wrap(uploadHandle)), length); + return new SyncTaskExecutionResult(partHandle.bytes(), length); + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java new file mode 100644 index 0000000..cc5eb5c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.syncservice.executor; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsTracer; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.EnumSet; + +/** + * BlockSyncReaderFactory constructs a block reader in the Datanode for the + * Sync Command to read blocks that will be written to the synchronization + * remote endpoint. + */ +public class BlockSyncReaderFactory { + + public static BlockReader createBlockReader(DataNode dataNode, + LocatedBlock locatedBlock, Configuration conf) throws IOException { + ClientContext clientContext = ClientContext.getFromConf(conf); + Token<BlockTokenIdentifier> accessToken = dataNode.getBlockAccessToken( + locatedBlock.getBlock(), + EnumSet.of(BlockTokenIdentifier.AccessMode.READ), + locatedBlock.getStorageTypes(), locatedBlock.getStorageIDs()); + + DatanodeInfo datanodeInfo = locatedBlock.getLocations()[0]; + + Socket socked = NetUtils.getDefaultSocketFactory(conf).createSocket(); + InetSocketAddress resolvedAddress = + datanodeInfo.getResolvedAddress(); + socked.connect(resolvedAddress); + + return new BlockReaderFactory(new DfsClientConf(conf)) + .setConfiguration(conf) + .setBlock(locatedBlock.getBlock()) + .setBlockToken(accessToken) + .setStartOffset(0) + .setLength(locatedBlock.getBlock().getNumBytes()) + .setInetSocketAddress(datanodeInfo.getResolvedAddress()) + .setVerifyChecksum(true) + .setDatanodeInfo(datanodeInfo) + .setClientName("BlockSyncOperationExecutor") + .setCachingStrategy(CachingStrategy.newDefaultStrategy()) + .setRemotePeerFactory((addr, blockToken, datanodeId) -> { + Peer peer = null; + Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + try { + sock.connect(addr, HdfsConstants.READ_TIMEOUT); + sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); + peer = DFSUtilClient.peerFromSocket(sock); + } finally { + if (peer == null) { + IOUtils.closeQuietly(sock); + } + } + return peer; + }) + .setClientCacheContext(clientContext) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java new file mode 100644 index 0000000..660e39e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.syncservice.executor; + +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback; +import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockSyncTaskRunner glues together the sync task and the feedback reporting. + */ +import java.util.function.Consumer; + +public class BlockSyncTaskRunner implements Runnable { + + public static final Logger LOG = + LoggerFactory.getLogger(BlockSyncTaskRunner.class); + + private BlockSyncTask blockSyncTask; + private BlockSyncOperationExecutor syncOperationExecutor; + + private Consumer<BlockSyncTaskExecutionFeedback> publishOutcomeCallback; + + public BlockSyncTaskRunner(BlockSyncTask blockSyncTask, + BlockSyncOperationExecutor syncOperationExecutor, + Consumer<BlockSyncTaskExecutionFeedback> publishOutcomeCallback) { + this.blockSyncTask = blockSyncTask; + this.syncOperationExecutor = syncOperationExecutor; + this.publishOutcomeCallback = publishOutcomeCallback; + } + + @Override + public void run() { + LOG.info("Executing BlockyncTask {} (on {})", + blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI()); + try { + SyncTaskExecutionResult result = + syncOperationExecutor.execute(blockSyncTask); + publishOutcomeCallback.accept(BlockSyncTaskExecutionFeedback + .finishedSuccessfully(blockSyncTask.getSyncTaskId(), + blockSyncTask.getSyncMountId(), + result)); + } catch (Exception e) { + LOG.error( + String.format("Exception executing MetadataSyncTask %s (on %s)", + blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI()), e); + publishOutcomeCallback.accept(BlockSyncTaskExecutionFeedback + .failedWithException(blockSyncTask.getSyncTaskId(), + blockSyncTask.getSyncMountId(), e)); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java new file mode 100644 index 0000000..43d4881 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test the BlockInputStream facade. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestBlockInputStream { + @Mock + private BlockReader blockReaderMock; + + @Test + public void testBlockInputStreamReadChar() { + BlockInputStream is = new BlockInputStream(blockReaderMock); + + try { + when(blockReaderMock.read(any(), eq(0), eq(1))) + .thenReturn(32); + // Making the mock perform the side effect of writing to buf is nasty. + is.read(); + verify(blockReaderMock, times(1)).read(any(), eq(0), eq(1)); + } catch (IOException e) { + fail("Could not even mock out read function."); + } + } + + @Test + public void testBlockInputStreamReadBuf() { + BlockInputStream is = new BlockInputStream(blockReaderMock); + + try { + byte[] buf = new byte[1024]; + when(blockReaderMock.read(buf, 0, buf.length)).thenReturn(1024); + is.read(buf, 0, buf.length); + verify(blockReaderMock, times(1)).read(buf, 0, buf.length); + } catch (IOException e) { + fail("Could not even mock out read function."); + } + } + + @Test + public void testBlockInputStreamSkip() { + BlockInputStream is = new BlockInputStream(blockReaderMock); + + try { + when(blockReaderMock.skip(10)).thenReturn(10L); + long ret = is.skip(10); + assertEquals(10, ret); + verify(blockReaderMock, times(1)).skip(10L); + } catch (IOException e) { + fail("Could not even mock out skip function."); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/06477abc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java new file mode 100644 index 0000000..e16d086 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.syncservice.executor; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BBPartHandle; +import org.apache.hadoop.fs.MultipartUploader; +import org.apache.hadoop.fs.PartHandle; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask; +import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestBlockSyncOperationExecutor { + + @Mock + private BlockReader blockReaderMock; + + @Mock + private MultipartUploader multipartUploaderMock; + + @Test + public void executeMultipartPutFileSyncTask() throws Exception { + long blockLength = 42L; + Configuration conf = new Configuration(); + BlockSyncOperationExecutor blockSyncOperationExecutor = + new BlockSyncOperationExecutor(conf, + (locatedBlock, config) -> blockReaderMock, + fs -> multipartUploaderMock); + String uploadHandleStr = "uploadHandle"; + byte[] uploadHandle = uploadHandleStr.getBytes(); + ByteBuffer byteBuffer = ByteBuffer.wrap(uploadHandle); + PartHandle partHandle = BBPartHandle.from(byteBuffer); + when(multipartUploaderMock.putPart(any(), any(), anyInt(), any(), + anyLong())).thenReturn(partHandle); + UUID syncTaskId = UUID.randomUUID(); + URI remoteUri = new URI("remoteUri"); + String syncMountId = "syncMountId"; + Block block = new Block(42L, blockLength, 44L); + ExtendedBlock extendedBlock1 = new ExtendedBlock("poolId", block); + LocatedBlock locatedBlock = new LocatedBlock(extendedBlock1, null); + List<LocatedBlock> locatedBlocks = Lists.newArrayList(locatedBlock); + Integer partNumber = 85; + final int offset = 0; + final long length = locatedBlock.getBlockSize(); + + + BlockSyncTask blockSyncTask = new BlockSyncTask(syncTaskId, remoteUri, + locatedBlocks, partNumber, uploadHandle, offset, length, syncMountId); + + SyncTaskExecutionResult result = + blockSyncOperationExecutor.execute(blockSyncTask); + + assertThat(result).isNotNull(); + Long actualLength = result.getNumberOfBytes(); + assertThat(actualLength).isEqualTo(blockLength); + assertThat(result.getResult()).isEqualTo(partHandle.bytes()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org