This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 72e3215  HDDS-4119. Improve performance of the BufferPool management 
of Ozone client (#1336)
72e3215 is described below

commit 72e3215846bfaaa562aa9fb7a15f87f27e97867c
Author: Elek, Márton <e...@users.noreply.github.com>
AuthorDate: Fri Sep 11 16:25:04 2020 +0200

    HDDS-4119. Improve performance of the BufferPool management of Ozone client 
(#1336)
---
 hadoop-hdds/client/pom.xml                         |  11 +
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 123 +++++++----
 .../apache/hadoop/hdds/scm/storage/BufferPool.java |  47 ++---
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |  37 ++--
 .../storage/TestBlockOutputStreamCorrectness.java  | 224 +++++++++++++++++++++
 .../hadoop/hdds/scm/storage/TestBufferPool.java    |  46 +++++
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |   4 +-
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  80 +++-----
 .../apache/hadoop/ozone/common/ChunkBuffer.java    |  14 +-
 .../common/ChunkBufferImplWithByteBuffer.java      |  10 +-
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |  29 ++-
 .../hadoop/ozone/common/TestChunkBuffer.java       |  16 +-
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |  31 ++-
 13 files changed, 508 insertions(+), 164 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index e7a8ebb..608839e 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -39,6 +39,17 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdds-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdds-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 32e5b37..1a16caf 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -17,46 +17,44 @@
  */
 
 package org.apache.hadoop.hdds.scm.storage;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
-    .putBlockAsync;
-import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
-    .writeChunkAsync;
-
 /**
  * An {@link OutputStream} used by the REST service in combination with the
  * SCMClient to write the value of a key to a sequence
@@ -120,6 +118,13 @@ public class BlockOutputStream extends OutputStream {
   private final List<DatanodeDetails> failedServers;
   private final Checksum checksum;
 
+  //number of buffers used before doing a flush/putBlock.
+  private int flushPeriod;
+  //bytes remaining to write in the current buffer.
+  private int currentBufferRemaining;
+  //current buffer allocated to write
+  private ChunkBuffer currentBuffer;
+
   /**
    * Creates a new BlockOutputStream.
    *
@@ -154,6 +159,14 @@ public class BlockOutputStream extends OutputStream {
     this.bufferPool = bufferPool;
     this.bytesPerChecksum = bytesPerChecksum;
 
+    //number of buffers used before doing a flush
+    refreshCurrentBuffer(bufferPool);
+    flushPeriod = (int) (streamBufferFlushSize / streamBufferSize);
+
+    Preconditions
+        .checkArgument(
+            (long) flushPeriod * streamBufferSize == streamBufferFlushSize);
+
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
     commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
@@ -165,6 +178,11 @@ public class BlockOutputStream extends OutputStream {
     checksum = new Checksum(checksumType, bytesPerChecksum);
   }
 
+  private void refreshCurrentBuffer(BufferPool pool) {
+    currentBuffer = pool.getCurrentBuffer();
+    currentBufferRemaining =
+        currentBuffer != null ? currentBuffer.remaining() : 0;
+  }
 
   public BlockID getBlockID() {
     return blockID.get();
@@ -209,9 +227,18 @@ public class BlockOutputStream extends OutputStream {
   @Override
   public void write(int b) throws IOException {
     checkOpen();
-    byte[] buf = new byte[1];
-    buf[0] = (byte) b;
-    write(buf, 0, 1);
+    allocateNewBufferIfNeeded();
+    currentBuffer.put((byte) b);
+    currentBufferRemaining--;
+    writeChunkIfNeeded();
+    writtenDataLength++;
+    doFlushOrWatchIfNeeded();
+  }
+
+  private void writeChunkIfNeeded() throws IOException {
+    if (currentBufferRemaining == 0) {
+      writeChunk(currentBuffer);
+    }
   }
 
   @Override
@@ -229,32 +256,36 @@ public class BlockOutputStream extends OutputStream {
     }
 
     while (len > 0) {
-      // Allocate a buffer if needed. The buffer will be allocated only
-      // once as needed and will be reused again for multiple blockOutputStream
-      // entries.
-      final ChunkBuffer currentBuffer = bufferPool.allocateBufferIfNeeded(
-          bytesPerChecksum);
-      final int writeLen = Math.min(currentBuffer.remaining(), len);
+      allocateNewBufferIfNeeded();
+      final int writeLen = Math.min(currentBufferRemaining, len);
       currentBuffer.put(b, off, writeLen);
-      if (!currentBuffer.hasRemaining()) {
-        writeChunk(currentBuffer);
-      }
+      currentBufferRemaining -= writeLen;
+      writeChunkIfNeeded();
       off += writeLen;
       len -= writeLen;
       writtenDataLength += writeLen;
-      if (shouldFlush()) {
+      doFlushOrWatchIfNeeded();
+    }
+  }
+
+  private void doFlushOrWatchIfNeeded() throws IOException {
+    if (currentBufferRemaining == 0) {
+      if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
         updateFlushLength();
         executePutBlock(false, false);
       }
       // Data in the bufferPool can not exceed streamBufferMaxSize
-      if (isBufferPoolFull()) {
+      if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
         handleFullBuffer();
       }
     }
   }
 
-  private boolean shouldFlush() {
-    return bufferPool.computeBufferData() % streamBufferFlushSize == 0;
+  private void allocateNewBufferIfNeeded() {
+    if (currentBufferRemaining == 0) {
+      currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum);
+      currentBufferRemaining = currentBuffer.remaining();
+    }
   }
 
   private void updateFlushLength() {
@@ -264,6 +295,7 @@ public class BlockOutputStream extends OutputStream {
   private boolean isBufferPoolFull() {
     return bufferPool.computeBufferData() == streamBufferMaxSize;
   }
+
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
@@ -334,6 +366,7 @@ public class BlockOutputStream extends OutputStream {
   // only contain data which have not been sufficiently replicated
   private void adjustBuffersOnException() {
     commitWatcher.releaseBuffersOnException();
+    refreshCurrentBuffer(bufferPool);
   }
 
   /**
@@ -363,6 +396,8 @@ public class BlockOutputStream extends OutputStream {
       setIoException(ioe);
       throw getIoException();
     }
+    refreshCurrentBuffer(bufferPool);
+
   }
 
   /**
@@ -481,7 +516,7 @@ public class BlockOutputStream extends OutputStream {
     checkOpen();
     // flush the last chunk data residing on the currentBuffer
     if (totalDataFlushedLength < writtenDataLength) {
-      final ChunkBuffer currentBuffer = bufferPool.getCurrentBuffer();
+      refreshCurrentBuffer(bufferPool);
       Preconditions.checkArgument(currentBuffer.position() > 0);
       if (currentBuffer.hasRemaining()) {
         writeChunk(currentBuffer);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index d117edb..dc27d4b 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.ByteStringConversion;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Function;
 
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+
+import com.google.common.base.Preconditions;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
 /**
  * This class creates and manages pool of n buffers.
  */
@@ -53,7 +54,7 @@ public class BufferPool {
     this.byteStringConversion = byteStringConversion;
   }
 
-  public Function<ByteBuffer, ByteString> byteStringConversion(){
+  public Function<ByteBuffer, ByteString> byteStringConversion() {
     return byteStringConversion;
   }
 
@@ -65,29 +66,22 @@ public class BufferPool {
    * If the currentBufferIndex is less than the buffer size - 1,
    * it means, the next buffer in the list has been freed up for
    * rewriting. Reuse the next available buffer in such cases.
-   *
+   * <p>
    * In case, the currentBufferIndex == buffer.size and buffer size is still
    * less than the capacity to be allocated, just allocate a buffer of size
    * chunk size.
-   *
    */
-  public ChunkBuffer allocateBufferIfNeeded(int increment) {
-    ChunkBuffer buffer = getCurrentBuffer();
-    if (buffer != null && buffer.hasRemaining()) {
-      return buffer;
-    }
-    if (currentBufferIndex < bufferList.size() - 1) {
-      buffer = getBuffer(currentBufferIndex + 1);
+  public ChunkBuffer allocateBuffer(int increment) {
+    currentBufferIndex++;
+    Preconditions.checkArgument(currentBufferIndex <= capacity - 1);
+    if (currentBufferIndex < bufferList.size()) {
+      return getBuffer(currentBufferIndex);
     } else {
-      buffer = ChunkBuffer.allocate(bufferSize, increment);
-      bufferList.add(buffer);
+      final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, 
increment);
+      bufferList.add(newBuffer);
+      Preconditions.checkArgument(bufferList.size() <= capacity);
+      return newBuffer;
     }
-    Preconditions.checkArgument(bufferList.size() <= capacity);
-    currentBufferIndex++;
-    // TODO: Turn the below precondition check on when Standalone pipeline
-    // is removed in the write path in tests
-    // Preconditions.checkArgument(buffer.position() == 0);
-    return buffer;
   }
 
   void releaseBuffer(ChunkBuffer chunkBuffer) {
@@ -130,4 +124,11 @@ public class BufferPool {
     return currentBufferIndex;
   }
 
+  public int getNumberOfUsedBuffers() {
+    return currentBufferIndex + 1;
+  }
+
+  public int getCapacity() {
+    return capacity;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 34d0d7c..ab6e252 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -24,28 +24,29 @@
  */
 package org.apache.hadoop.hdds.scm.storage;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * This class executes watchForCommit on ratis pipeline and releases
  * buffers once data successfully gets replicated.
@@ -96,7 +97,15 @@ public class CommitWatcher {
       long length = buffers.stream().mapToLong(ChunkBuffer::position).sum();
       totalAckDataLength += length;
       // clear the future object from the future Map
-      Preconditions.checkNotNull(futureMap.remove(totalAckDataLength));
+      final CompletableFuture<ContainerCommandResponseProto> remove =
+          futureMap.remove(totalAckDataLength);
+      if (remove == null) {
+        LOG.error("Couldn't find required future for " + totalAckDataLength);
+        for (Long key : futureMap.keySet()) {
+          LOG.error("Existing acknowledged data: " + key);
+        }
+      }
+      Preconditions.checkNotNull(remove);
       for (ChunkBuffer byteBuffer : buffers) {
         bufferPool.releaseBuffer(byteBuffer);
       }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
new file mode 100644
index 0000000..141a1d8
--- /dev/null
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * UNIT test for BlockOutputStream.
+ * <p>
+ * Compares bytes written to the stream and received in the ChunkWriteRequests.
+ */
+public class TestBlockOutputStreamCorrectness {
+
+  private static final long SEED = 18480315L;
+
+  private int writeUnitSize = 1;
+
+  @Test
+  public void test() throws IOException {
+
+    final BufferPool bufferPool = new BufferPool(4 * 1024 * 1024, 32 / 4);
+
+    for (int block = 0; block < 10; block++) {
+      BlockOutputStream outputStream =
+          createBlockOutputStream(bufferPool);
+
+      Random random = new Random(SEED);
+
+      int max = 256 * 1024 * 1024 / writeUnitSize;
+
+      byte[] writeBuffer = new byte[writeUnitSize];
+      for (int t = 0; t < max; t++) {
+        if (writeUnitSize > 1) {
+          for (int i = 0; i < writeBuffer.length; i++) {
+            writeBuffer[i] = (byte) random.nextInt();
+          }
+          outputStream.write(writeBuffer, 0, writeBuffer.length);
+        } else {
+          outputStream.write((byte) random.nextInt());
+        }
+      }
+      outputStream.close();
+    }
+  }
+
+  @NotNull
+  private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
+      throws IOException {
+
+    final Pipeline pipeline = MockPipeline.createRatisPipeline();
+
+    final XceiverClientManager xcm = Mockito.mock(XceiverClientManager.class);
+    Mockito.when(xcm.acquireClient(Mockito.any()))
+        .thenReturn(new MockXceiverClientSpi(pipeline));
+
+    BlockOutputStream outputStream = new BlockOutputStream(
+        new BlockID(1L, 1L),
+        xcm,
+        pipeline,
+        4 * 1024 * 1024,
+        16 * 1024 * 1024,
+        true,
+        32 * 1024 * 1024,
+        bufferPool,
+        ChecksumType.NONE,
+        256 * 1024);
+    return outputStream;
+  }
+
+  /**
+   * XCeiverClient which simulates responses.
+   */
+  private class MockXceiverClientSpi extends XceiverClientSpi {
+
+    private final Pipeline pipeline;
+
+    private Random expectedRandomStream = new Random(SEED);
+
+    private AtomicInteger counter = new AtomicInteger();
+
+    MockXceiverClientSpi(Pipeline pipeline) {
+      super();
+      this.pipeline = pipeline;
+    }
+
+    @Override
+    public void connect() throws Exception {
+
+    }
+
+    @Override
+    public void connect(String encodedToken) throws Exception {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Pipeline getPipeline() {
+      return pipeline;
+    }
+
+    @Override
+    public XceiverClientReply sendCommandAsync(
+        ContainerCommandRequestProto request
+    )
+        throws IOException, ExecutionException, InterruptedException {
+
+      final ContainerCommandResponseProto.Builder builder =
+          ContainerCommandResponseProto.newBuilder()
+              .setResult(Result.SUCCESS)
+              .setCmdType(request.getCmdType());
+
+      switch (request.getCmdType()) {
+      case PutBlock:
+        builder.setPutBlock(PutBlockResponseProto.newBuilder()
+            .setCommittedBlockLength(
+                GetCommittedBlockLengthResponseProto.newBuilder()
+                    .setBlockID(
+                        request.getPutBlock().getBlockData().getBlockID())
+                    .setBlockLength(
+                        request.getPutBlock().getBlockData().getSize())
+                    .build())
+            .build());
+        break;
+      case WriteChunk:
+        ByteString data = request.getWriteChunk().getData();
+        final byte[] writePayload = data.toByteArray();
+        for (int i = 0; i < writePayload.length; i++) {
+          byte expectedByte = (byte) expectedRandomStream.nextInt();
+          Assert.assertEquals(expectedByte,
+              writePayload[i]);
+        }
+        break;
+      default:
+        //no-op
+      }
+      final XceiverClientReply result = new XceiverClientReply(
+          CompletableFuture.completedFuture(builder.build()));
+      result.setLogIndex(counter.incrementAndGet());
+      return result;
+
+    }
+
+    @Override
+    public ReplicationType getPipelineType() {
+      return null;
+    }
+
+    @Override
+    public XceiverClientReply watchForCommit(long index)
+        throws InterruptedException, ExecutionException, TimeoutException,
+        IOException {
+      final ContainerCommandResponseProto.Builder builder =
+          ContainerCommandResponseProto.newBuilder()
+              .setCmdType(Type.WriteChunk)
+              .setResult(Result.SUCCESS);
+      final XceiverClientReply xceiverClientReply = new XceiverClientReply(
+          CompletableFuture.completedFuture(builder.build()));
+      xceiverClientReply.setLogIndex(index);
+      return xceiverClientReply;
+    }
+
+    @Override
+    public long getReplicatedMinCommitIndex() {
+      return 0;
+    }
+
+    @Override
+    public Map<DatanodeDetails, ContainerCommandResponseProto>
+        sendCommandOnAllNodes(ContainerCommandRequestProto request
+    ) throws IOException, InterruptedException {
+      return null;
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
new file mode 100644
index 0000000..cd53d71
--- /dev/null
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for buffer pool.
+ */
+public class TestBufferPool {
+
+  @Test
+  public void releaseAndReallocate() {
+    BufferPool pool = new BufferPool(1024, 8);
+    ChunkBuffer cb1 = pool.allocateBuffer(0);
+    ChunkBuffer cb2 = pool.allocateBuffer(0);
+    ChunkBuffer cb3 = pool.allocateBuffer(0);
+
+    pool.releaseBuffer(cb1);
+
+    //current state cb2, -> cb3, cb1
+    final ChunkBuffer allocated = pool.allocateBuffer(0);
+    Assert.assertEquals(3, pool.getSize());
+    Assert.assertEquals(cb1, allocated);
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 1c7d1f6..fd49529 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -31,9 +31,9 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
 
 /**
  * A Client for the storageContainer protocol.
@@ -43,7 +43,7 @@ public abstract class XceiverClientSpi implements Closeable {
   private final AtomicInteger referenceCount;
   private boolean isEvicted;
 
-  XceiverClientSpi() {
+  public XceiverClientSpi() {
     this.referenceCount = new AtomicInteger(0);
     this.isEvicted = false;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 11acf82..ea5fc82 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,12 +18,41 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CloseContainerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .BlockNotCommittedException;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
 import org.apache.hadoop.io.Text;
@@ -31,53 +60,8 @@ import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .CloseContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .DatanodeBlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetBlockRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetBlockResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetSmallFileResponseProto;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .PutBlockRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .PutSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .WriteChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    PutSmallFileResponseProto;
-import org.apache.hadoop.hdds.client.BlockID;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 /**
  * Implementation of all container protocol calls performed by Container
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index b7ba6d6..65f8a89 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.ozone.common;
 
-import org.apache.hadoop.hdds.scm.ByteStringConversion;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
@@ -27,6 +24,10 @@ import java.util.List;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
 /** Buffer for a block chunk. */
 public interface ChunkBuffer {
 
@@ -88,6 +89,13 @@ public interface ChunkBuffer {
     return put(ByteBuffer.wrap(b));
   }
 
+  /** Similar to {@link ByteBuffer#put(byte[])}. */
+  default ChunkBuffer put(byte b) {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    return put(buf, 0, 1);
+  }
+
   /** Similar to {@link ByteBuffer#put(byte[], int, int)}. */
   default ChunkBuffer put(byte[] b, int offset, int length) {
     return put(ByteBuffer.wrap(b, offset, length));
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 299cab8..dd81635 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.ozone.common;
 
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
@@ -29,6 +27,8 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Function;
 
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
 /** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */
 final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
   private final ByteBuffer buffer;
@@ -105,6 +105,12 @@ final class ChunkBufferImplWithByteBuffer implements 
ChunkBuffer {
   }
 
   @Override
+  public ChunkBuffer put(byte b) {
+    buffer.put(b);
+    return this;
+  }
+
+  @Override
   public ChunkBuffer clear() {
     buffer.clear();
     return this;
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
index a8ddac1..556c052 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.hdds.scm.pipeline;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+
+import com.google.common.base.Preconditions;
+
 /**
  * Provides {@link Pipeline} factory methods for testing.
  */
@@ -68,6 +71,22 @@ public final class MockPipeline {
         .build();
   }
 
+  public static Pipeline createRatisPipeline() {
+
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+
+    return Pipeline.newBuilder()
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .setType(ReplicationType.RATIS)
+        .setFactor(ReplicationFactor.THREE)
+        .setNodes(nodes)
+        .build();
+  }
+
   private MockPipeline() {
     throw new UnsupportedOperationException("no instances");
   }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
index 9ca735cb..9b69fad 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
@@ -17,12 +17,6 @@
  */
 package org.apache.hadoop.ozone.common;
 
-import org.apache.hadoop.hdds.utils.MockGatheringChannel;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,6 +27,13 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.hadoop.hdds.utils.MockGatheringChannel;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
 /**
  * Test {@link ChunkBuffer} implementations.
  */
@@ -71,7 +72,8 @@ public class TestChunkBuffer {
   private static void runTestIncrementalChunkBuffer(int increment, int n) {
     final byte[] expected = new byte[n];
     ThreadLocalRandom.current().nextBytes(expected);
-    runTestImpl(expected, increment, ChunkBuffer.allocate(n, increment));
+    runTestImpl(expected, increment,
+        new IncrementalChunkBuffer(n, increment, false));
   }
 
   @Test(timeout = 1_000)
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index 0a2ff14..63beeee 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -17,6 +17,14 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
@@ -24,11 +32,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -43,6 +51,9 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.NotReplicatedException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -50,21 +61,9 @@ import org.apache.ratis.protocol.TimeoutIOException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.Timeout;
-import static java.util.Collections.singletonList;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
-        OZONE_SCM_STALENODE_INTERVAL;
 
 /**
  * Class to test CommitWatcher functionality.
@@ -187,7 +186,7 @@ public class TestCommitWatcher {
           ContainerTestHelper
               .getWriteChunkRequest(pipeline, blockID, chunkSize, null);
       // add the data to the buffer pool
-      final ChunkBuffer byteBuffer = bufferPool.allocateBufferIfNeeded(0);
+      final ChunkBuffer byteBuffer = bufferPool.allocateBuffer(0);
       byteBuffer.put(writeChunkRequest.getWriteChunk().getData());
       ratisClient.sendCommandAsync(writeChunkRequest);
       ContainerProtos.ContainerCommandRequestProto putBlockRequest =
@@ -264,7 +263,7 @@ public class TestCommitWatcher {
           ContainerTestHelper
               .getWriteChunkRequest(pipeline, blockID, chunkSize, null);
       // add the data to the buffer pool
-      final ChunkBuffer byteBuffer = bufferPool.allocateBufferIfNeeded(0);
+      final ChunkBuffer byteBuffer = bufferPool.allocateBuffer(0);
       byteBuffer.put(writeChunkRequest.getWriteChunk().getData());
       ratisClient.sendCommandAsync(writeChunkRequest);
       ContainerProtos.ContainerCommandRequestProto putBlockRequest =


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to