This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new 72e3215 HDDS-4119. Improve performance of the BufferPool management of Ozone client (#1336) 72e3215 is described below commit 72e3215846bfaaa562aa9fb7a15f87f27e97867c Author: Elek, Márton <e...@users.noreply.github.com> AuthorDate: Fri Sep 11 16:25:04 2020 +0200 HDDS-4119. Improve performance of the BufferPool management of Ozone client (#1336) --- hadoop-hdds/client/pom.xml | 11 + .../hadoop/hdds/scm/storage/BlockOutputStream.java | 123 +++++++---- .../apache/hadoop/hdds/scm/storage/BufferPool.java | 47 ++--- .../hadoop/hdds/scm/storage/CommitWatcher.java | 37 ++-- .../storage/TestBlockOutputStreamCorrectness.java | 224 +++++++++++++++++++++ .../hadoop/hdds/scm/storage/TestBufferPool.java | 46 +++++ .../apache/hadoop/hdds/scm/XceiverClientSpi.java | 4 +- .../hdds/scm/storage/ContainerProtocolCalls.java | 80 +++----- .../apache/hadoop/ozone/common/ChunkBuffer.java | 14 +- .../common/ChunkBufferImplWithByteBuffer.java | 10 +- .../hadoop/hdds/scm/pipeline/MockPipeline.java | 29 ++- .../hadoop/ozone/common/TestChunkBuffer.java | 16 +- .../hadoop/ozone/client/rpc/TestCommitWatcher.java | 31 ++- 13 files changed, 508 insertions(+), 164 deletions(-) diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml index e7a8ebb..608839e 100644 --- a/hadoop-hdds/client/pom.xml +++ b/hadoop-hdds/client/pom.xml @@ -39,6 +39,17 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdds-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>io.netty</groupId> diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 32e5b37..1a16caf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -17,46 +17,44 @@ */ package org.apache.hadoop.hdds.scm.storage; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; +import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.hdds.client.BlockID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .putBlockAsync; -import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .writeChunkAsync; - /** * An {@link OutputStream} used by the REST service in combination with the * SCMClient to write the value of a key to a sequence @@ -120,6 +118,13 @@ public class BlockOutputStream extends OutputStream { private final List<DatanodeDetails> failedServers; private final Checksum checksum; + //number of buffers used before doing a flush/putBlock. + private int flushPeriod; + //bytes remaining to write in the current buffer. + private int currentBufferRemaining; + //current buffer allocated to write + private ChunkBuffer currentBuffer; + /** * Creates a new BlockOutputStream. * @@ -154,6 +159,14 @@ public class BlockOutputStream extends OutputStream { this.bufferPool = bufferPool; this.bytesPerChecksum = bytesPerChecksum; + //number of buffers used before doing a flush + refreshCurrentBuffer(bufferPool); + flushPeriod = (int) (streamBufferFlushSize / streamBufferSize); + + Preconditions + .checkArgument( + (long) flushPeriod * streamBufferSize == streamBufferFlushSize); + // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); commitWatcher = new CommitWatcher(bufferPool, xceiverClient); @@ -165,6 +178,11 @@ public class BlockOutputStream extends OutputStream { checksum = new Checksum(checksumType, bytesPerChecksum); } + private void refreshCurrentBuffer(BufferPool pool) { + currentBuffer = pool.getCurrentBuffer(); + currentBufferRemaining = + currentBuffer != null ? currentBuffer.remaining() : 0; + } public BlockID getBlockID() { return blockID.get(); @@ -209,9 +227,18 @@ public class BlockOutputStream extends OutputStream { @Override public void write(int b) throws IOException { checkOpen(); - byte[] buf = new byte[1]; - buf[0] = (byte) b; - write(buf, 0, 1); + allocateNewBufferIfNeeded(); + currentBuffer.put((byte) b); + currentBufferRemaining--; + writeChunkIfNeeded(); + writtenDataLength++; + doFlushOrWatchIfNeeded(); + } + + private void writeChunkIfNeeded() throws IOException { + if (currentBufferRemaining == 0) { + writeChunk(currentBuffer); + } } @Override @@ -229,32 +256,36 @@ public class BlockOutputStream extends OutputStream { } while (len > 0) { - // Allocate a buffer if needed. The buffer will be allocated only - // once as needed and will be reused again for multiple blockOutputStream - // entries. - final ChunkBuffer currentBuffer = bufferPool.allocateBufferIfNeeded( - bytesPerChecksum); - final int writeLen = Math.min(currentBuffer.remaining(), len); + allocateNewBufferIfNeeded(); + final int writeLen = Math.min(currentBufferRemaining, len); currentBuffer.put(b, off, writeLen); - if (!currentBuffer.hasRemaining()) { - writeChunk(currentBuffer); - } + currentBufferRemaining -= writeLen; + writeChunkIfNeeded(); off += writeLen; len -= writeLen; writtenDataLength += writeLen; - if (shouldFlush()) { + doFlushOrWatchIfNeeded(); + } + } + + private void doFlushOrWatchIfNeeded() throws IOException { + if (currentBufferRemaining == 0) { + if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { updateFlushLength(); executePutBlock(false, false); } // Data in the bufferPool can not exceed streamBufferMaxSize - if (isBufferPoolFull()) { + if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) { handleFullBuffer(); } } } - private boolean shouldFlush() { - return bufferPool.computeBufferData() % streamBufferFlushSize == 0; + private void allocateNewBufferIfNeeded() { + if (currentBufferRemaining == 0) { + currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum); + currentBufferRemaining = currentBuffer.remaining(); + } } private void updateFlushLength() { @@ -264,6 +295,7 @@ public class BlockOutputStream extends OutputStream { private boolean isBufferPoolFull() { return bufferPool.computeBufferData() == streamBufferMaxSize; } + /** * Will be called on the retryPath in case closedContainerException/ * TimeoutException. @@ -334,6 +366,7 @@ public class BlockOutputStream extends OutputStream { // only contain data which have not been sufficiently replicated private void adjustBuffersOnException() { commitWatcher.releaseBuffersOnException(); + refreshCurrentBuffer(bufferPool); } /** @@ -363,6 +396,8 @@ public class BlockOutputStream extends OutputStream { setIoException(ioe); throw getIoException(); } + refreshCurrentBuffer(bufferPool); + } /** @@ -481,7 +516,7 @@ public class BlockOutputStream extends OutputStream { checkOpen(); // flush the last chunk data residing on the currentBuffer if (totalDataFlushedLength < writtenDataLength) { - final ChunkBuffer currentBuffer = bufferPool.getCurrentBuffer(); + refreshCurrentBuffer(bufferPool); Preconditions.checkArgument(currentBuffer.position() > 0); if (currentBuffer.hasRemaining()) { writeChunk(currentBuffer); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index d117edb..dc27d4b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -18,16 +18,17 @@ package org.apache.hadoop.hdds.scm.storage; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.ByteStringConversion; -import org.apache.hadoop.ozone.common.ChunkBuffer; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.function.Function; +import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.ozone.common.ChunkBuffer; + +import com.google.common.base.Preconditions; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + /** * This class creates and manages pool of n buffers. */ @@ -53,7 +54,7 @@ public class BufferPool { this.byteStringConversion = byteStringConversion; } - public Function<ByteBuffer, ByteString> byteStringConversion(){ + public Function<ByteBuffer, ByteString> byteStringConversion() { return byteStringConversion; } @@ -65,29 +66,22 @@ public class BufferPool { * If the currentBufferIndex is less than the buffer size - 1, * it means, the next buffer in the list has been freed up for * rewriting. Reuse the next available buffer in such cases. - * + * <p> * In case, the currentBufferIndex == buffer.size and buffer size is still * less than the capacity to be allocated, just allocate a buffer of size * chunk size. - * */ - public ChunkBuffer allocateBufferIfNeeded(int increment) { - ChunkBuffer buffer = getCurrentBuffer(); - if (buffer != null && buffer.hasRemaining()) { - return buffer; - } - if (currentBufferIndex < bufferList.size() - 1) { - buffer = getBuffer(currentBufferIndex + 1); + public ChunkBuffer allocateBuffer(int increment) { + currentBufferIndex++; + Preconditions.checkArgument(currentBufferIndex <= capacity - 1); + if (currentBufferIndex < bufferList.size()) { + return getBuffer(currentBufferIndex); } else { - buffer = ChunkBuffer.allocate(bufferSize, increment); - bufferList.add(buffer); + final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, increment); + bufferList.add(newBuffer); + Preconditions.checkArgument(bufferList.size() <= capacity); + return newBuffer; } - Preconditions.checkArgument(bufferList.size() <= capacity); - currentBufferIndex++; - // TODO: Turn the below precondition check on when Standalone pipeline - // is removed in the write path in tests - // Preconditions.checkArgument(buffer.position() == 0); - return buffer; } void releaseBuffer(ChunkBuffer chunkBuffer) { @@ -130,4 +124,11 @@ public class BufferPool { return currentBufferIndex; } + public int getNumberOfUsedBuffers() { + return currentBufferIndex + 1; + } + + public int getCapacity() { + return capacity; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 34d0d7c..ab6e252 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -24,28 +24,29 @@ */ package org.apache.hadoop.hdds.scm.storage; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.XceiverClientReply; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.ozone.common.ChunkBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ExecutionException; - +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.ozone.common.ChunkBuffer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * This class executes watchForCommit on ratis pipeline and releases * buffers once data successfully gets replicated. @@ -96,7 +97,15 @@ public class CommitWatcher { long length = buffers.stream().mapToLong(ChunkBuffer::position).sum(); totalAckDataLength += length; // clear the future object from the future Map - Preconditions.checkNotNull(futureMap.remove(totalAckDataLength)); + final CompletableFuture<ContainerCommandResponseProto> remove = + futureMap.remove(totalAckDataLength); + if (remove == null) { + LOG.error("Couldn't find required future for " + totalAckDataLength); + for (Long key : futureMap.keySet()) { + LOG.error("Existing acknowledged data: " + key); + } + } + Preconditions.checkNotNull(remove); for (ChunkBuffer byteBuffer : buffers) { bufferPool.releaseBuffer(byteBuffer); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java new file mode 100644 index 0000000..141a1d8 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.jetbrains.annotations.NotNull; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * UNIT test for BlockOutputStream. + * <p> + * Compares bytes written to the stream and received in the ChunkWriteRequests. + */ +public class TestBlockOutputStreamCorrectness { + + private static final long SEED = 18480315L; + + private int writeUnitSize = 1; + + @Test + public void test() throws IOException { + + final BufferPool bufferPool = new BufferPool(4 * 1024 * 1024, 32 / 4); + + for (int block = 0; block < 10; block++) { + BlockOutputStream outputStream = + createBlockOutputStream(bufferPool); + + Random random = new Random(SEED); + + int max = 256 * 1024 * 1024 / writeUnitSize; + + byte[] writeBuffer = new byte[writeUnitSize]; + for (int t = 0; t < max; t++) { + if (writeUnitSize > 1) { + for (int i = 0; i < writeBuffer.length; i++) { + writeBuffer[i] = (byte) random.nextInt(); + } + outputStream.write(writeBuffer, 0, writeBuffer.length); + } else { + outputStream.write((byte) random.nextInt()); + } + } + outputStream.close(); + } + } + + @NotNull + private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) + throws IOException { + + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + + final XceiverClientManager xcm = Mockito.mock(XceiverClientManager.class); + Mockito.when(xcm.acquireClient(Mockito.any())) + .thenReturn(new MockXceiverClientSpi(pipeline)); + + BlockOutputStream outputStream = new BlockOutputStream( + new BlockID(1L, 1L), + xcm, + pipeline, + 4 * 1024 * 1024, + 16 * 1024 * 1024, + true, + 32 * 1024 * 1024, + bufferPool, + ChecksumType.NONE, + 256 * 1024); + return outputStream; + } + + /** + * XCeiverClient which simulates responses. + */ + private class MockXceiverClientSpi extends XceiverClientSpi { + + private final Pipeline pipeline; + + private Random expectedRandomStream = new Random(SEED); + + private AtomicInteger counter = new AtomicInteger(); + + MockXceiverClientSpi(Pipeline pipeline) { + super(); + this.pipeline = pipeline; + } + + @Override + public void connect() throws Exception { + + } + + @Override + public void connect(String encodedToken) throws Exception { + + } + + @Override + public void close() { + + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public XceiverClientReply sendCommandAsync( + ContainerCommandRequestProto request + ) + throws IOException, ExecutionException, InterruptedException { + + final ContainerCommandResponseProto.Builder builder = + ContainerCommandResponseProto.newBuilder() + .setResult(Result.SUCCESS) + .setCmdType(request.getCmdType()); + + switch (request.getCmdType()) { + case PutBlock: + builder.setPutBlock(PutBlockResponseProto.newBuilder() + .setCommittedBlockLength( + GetCommittedBlockLengthResponseProto.newBuilder() + .setBlockID( + request.getPutBlock().getBlockData().getBlockID()) + .setBlockLength( + request.getPutBlock().getBlockData().getSize()) + .build()) + .build()); + break; + case WriteChunk: + ByteString data = request.getWriteChunk().getData(); + final byte[] writePayload = data.toByteArray(); + for (int i = 0; i < writePayload.length; i++) { + byte expectedByte = (byte) expectedRandomStream.nextInt(); + Assert.assertEquals(expectedByte, + writePayload[i]); + } + break; + default: + //no-op + } + final XceiverClientReply result = new XceiverClientReply( + CompletableFuture.completedFuture(builder.build())); + result.setLogIndex(counter.incrementAndGet()); + return result; + + } + + @Override + public ReplicationType getPipelineType() { + return null; + } + + @Override + public XceiverClientReply watchForCommit(long index) + throws InterruptedException, ExecutionException, TimeoutException, + IOException { + final ContainerCommandResponseProto.Builder builder = + ContainerCommandResponseProto.newBuilder() + .setCmdType(Type.WriteChunk) + .setResult(Result.SUCCESS); + final XceiverClientReply xceiverClientReply = new XceiverClientReply( + CompletableFuture.completedFuture(builder.build())); + xceiverClientReply.setLogIndex(index); + return xceiverClientReply; + } + + @Override + public long getReplicatedMinCommitIndex() { + return 0; + } + + @Override + public Map<DatanodeDetails, ContainerCommandResponseProto> + sendCommandOnAllNodes(ContainerCommandRequestProto request + ) throws IOException, InterruptedException { + return null; + } + } + +} \ No newline at end of file diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java new file mode 100644 index 0000000..cd53d71 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import org.apache.hadoop.ozone.common.ChunkBuffer; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit test for buffer pool. + */ +public class TestBufferPool { + + @Test + public void releaseAndReallocate() { + BufferPool pool = new BufferPool(1024, 8); + ChunkBuffer cb1 = pool.allocateBuffer(0); + ChunkBuffer cb2 = pool.allocateBuffer(0); + ChunkBuffer cb3 = pool.allocateBuffer(0); + + pool.releaseBuffer(cb1); + + //current state cb2, -> cb3, cb1 + final ChunkBuffer allocated = pool.allocateBuffer(0); + Assert.assertEquals(3, pool.getSize()); + Assert.assertEquals(cb1, allocated); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 1c7d1f6..fd49529 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction; /** * A Client for the storageContainer protocol. @@ -43,7 +43,7 @@ public abstract class XceiverClientSpi implements Closeable { private final AtomicInteger referenceCount; private boolean isEvicted; - XceiverClientSpi() { + public XceiverClientSpi() { this.referenceCount = new AtomicInteger(0); this.isEvicted = false; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 11acf82..ea5fc82 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -18,12 +18,41 @@ package org.apache.hadoop.hdds.scm.storage; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CloseContainerRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; -import org.apache.hadoop.hdds.scm.container.common.helpers - .BlockNotCommittedException; +import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector; import org.apache.hadoop.io.Text; @@ -31,53 +60,8 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CloseContainerRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .DatanodeBlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetBlockRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetBlockResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetSmallFileRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetSmallFileResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .PutBlockRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .PutSmallFileRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadContainerRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadContainerResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .WriteChunkRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - PutSmallFileResponseProto; -import org.apache.hadoop.hdds.client.BlockID; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; /** * Implementation of all container protocol calls performed by Container diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java index b7ba6d6..65f8a89 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.ozone.common; -import org.apache.hadoop.hdds.scm.ByteStringConversion; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; @@ -27,6 +24,10 @@ import java.util.List; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.hadoop.hdds.scm.ByteStringConversion; + +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + /** Buffer for a block chunk. */ public interface ChunkBuffer { @@ -88,6 +89,13 @@ public interface ChunkBuffer { return put(ByteBuffer.wrap(b)); } + /** Similar to {@link ByteBuffer#put(byte[])}. */ + default ChunkBuffer put(byte b) { + byte[] buf = new byte[1]; + buf[0] = (byte) b; + return put(buf, 0, 1); + } + /** Similar to {@link ByteBuffer#put(byte[], int, int)}. */ default ChunkBuffer put(byte[] b, int offset, int length) { return put(ByteBuffer.wrap(b, offset, length)); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java index 299cab8..dd81635 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.ozone.common; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; @@ -29,6 +27,8 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.function.Function; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + /** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */ final class ChunkBufferImplWithByteBuffer implements ChunkBuffer { private final ByteBuffer buffer; @@ -105,6 +105,12 @@ final class ChunkBufferImplWithByteBuffer implements ChunkBuffer { } @Override + public ChunkBuffer put(byte b) { + buffer.put(b); + return this; + } + + @Override public ChunkBuffer clear() { buffer.clear(); return this; diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java index a8ddac1..556c052 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; - import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; + +import com.google.common.base.Preconditions; + /** * Provides {@link Pipeline} factory methods for testing. */ @@ -68,6 +71,22 @@ public final class MockPipeline { .build(); } + public static Pipeline createRatisPipeline() { + + List<DatanodeDetails> nodes = new ArrayList<>(); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + + return Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setType(ReplicationType.RATIS) + .setFactor(ReplicationFactor.THREE) + .setNodes(nodes) + .build(); + } + private MockPipeline() { throw new UnsupportedOperationException("no instances"); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java index 9ca735cb..9b69fad 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java @@ -17,12 +17,6 @@ */ package org.apache.hadoop.ozone.common; -import org.apache.hadoop.hdds.utils.MockGatheringChannel; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.junit.Assert; -import org.junit.Test; -import org.junit.jupiter.api.Assertions; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -33,6 +27,13 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hdds.utils.MockGatheringChannel; + +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + /** * Test {@link ChunkBuffer} implementations. */ @@ -71,7 +72,8 @@ public class TestChunkBuffer { private static void runTestIncrementalChunkBuffer(int increment, int n) { final byte[] expected = new byte[n]; ThreadLocalRandom.current().nextBytes(expected); - runTestImpl(expected, increment, ChunkBuffer.allocate(n, increment)); + runTestImpl(expected, increment, + new IncrementalChunkBuffer(n, increment, false)); } @Test(timeout = 1_000) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java index 0a2ff14..63beeee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -17,6 +17,14 @@ package org.apache.hadoop.ozone.client.rpc; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; @@ -24,11 +32,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.ScmConfigKeys;; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -43,6 +51,9 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.ContainerTestHelper; + +import static java.util.Collections.singletonList; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftRetryFailureException; @@ -50,21 +61,9 @@ import org.apache.ratis.protocol.TimeoutIOException; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; -import static java.util.Collections.singletonList; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys. - OZONE_SCM_STALENODE_INTERVAL; /** * Class to test CommitWatcher functionality. @@ -187,7 +186,7 @@ public class TestCommitWatcher { ContainerTestHelper .getWriteChunkRequest(pipeline, blockID, chunkSize, null); // add the data to the buffer pool - final ChunkBuffer byteBuffer = bufferPool.allocateBufferIfNeeded(0); + final ChunkBuffer byteBuffer = bufferPool.allocateBuffer(0); byteBuffer.put(writeChunkRequest.getWriteChunk().getData()); ratisClient.sendCommandAsync(writeChunkRequest); ContainerProtos.ContainerCommandRequestProto putBlockRequest = @@ -264,7 +263,7 @@ public class TestCommitWatcher { ContainerTestHelper .getWriteChunkRequest(pipeline, blockID, chunkSize, null); // add the data to the buffer pool - final ChunkBuffer byteBuffer = bufferPool.allocateBufferIfNeeded(0); + final ChunkBuffer byteBuffer = bufferPool.allocateBuffer(0); byteBuffer.put(writeChunkRequest.getWriteChunk().getData()); ratisClient.sendCommandAsync(writeChunkRequest); ContainerProtos.ContainerCommandRequestProto putBlockRequest = --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org