This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a7b903bc91 HDDS-2146. Optimize block write path performance by
reducing no of watchForCommit calls. (#5272)
a7b903bc91 is described below
commit a7b903bc9196b19cbaea17c5f5b4aee49cf79e5f
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Sep 18 14:29:45 2023 -0700
HDDS-2146. Optimize block write path performance by reducing no of
watchForCommit calls. (#5272)
---
.../hdds/scm/storage/AbstractCommitWatcher.java | 191 +++++++++++++++++
.../hdds/scm/storage/BlockDataStreamOutput.java | 6 +-
.../hadoop/hdds/scm/storage/CommitWatcher.java | 227 +++------------------
.../hdds/scm/storage/RatisBlockOutputStream.java | 2 +-
.../hdds/scm/storage/StreamCommitWatcher.java | 181 +---------------
.../dev-support/findbugsExcludeFile.xml | 2 +-
.../scm/storage}/TestCommitWatcher.java | 20 +-
.../ozone/client/rpc/TestBlockOutputStream.java | 18 +-
.../rpc/TestBlockOutputStreamFlushDelay.java | 18 +-
.../rpc/TestBlockOutputStreamWithFailures.java | 28 +--
...estBlockOutputStreamWithFailuresFlushDelay.java | 28 +--
.../ozone/client/rpc/TestWatchForCommit.java | 8 +-
12 files changed, 302 insertions(+), 427 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
new file mode 100644
index 0000000000..0c5501c792
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+abstract class AbstractCommitWatcher<BUFFER> {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractCommitWatcher.class);
+
+ /**
+ * Commit index -> buffers: when a commit index is acknowledged,
+ * the corresponding buffers can be released.
+ */
+ private final SortedMap<Long, List<BUFFER>> commitIndexMap
+ = new ConcurrentSkipListMap<>();
+ /**
+ * Commit index -> reply future:
+ * cache to reply futures to avoid sending duplicated watch requests.
+ */
+ private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
+ replies = new ConcurrentHashMap<>();
+
+ private final XceiverClientSpi client;
+
+ private long totalAckDataLength;
+
+ AbstractCommitWatcher(XceiverClientSpi client) {
+ this.client = client;
+ }
+
+ @VisibleForTesting
+ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
+ return commitIndexMap;
+ }
+
+ void updateCommitInfoMap(long index, List<BUFFER> buffers) {
+ commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
+ .addAll(buffers);
+ }
+
+ /** @return the total data which has been acknowledged. */
+ long getTotalAckDataLength() {
+ return totalAckDataLength;
+ }
+
+ long addAckDataLength(long acked) {
+ totalAckDataLength += acked;
+ return totalAckDataLength;
+ }
+
+ /**
+ * Watch for commit for the first index.
+ * This is useful when the buffer is full
+ * since the first chunk can be released once it has been committed.
+ * Otherwise, the client write is blocked.
+ *
+ * @return {@link XceiverClientReply} reply from raft client
+ * @throws IOException in case watchForCommit fails
+ */
+ XceiverClientReply watchOnFirstIndex() throws IOException {
+ if (commitIndexMap.isEmpty()) {
+ return null;
+ }
+ return watchForCommit(commitIndexMap.firstKey());
+ }
+
+ /**
+ * Watch for commit for the last index.
+ * This is useful when the buffer is not full
+ * since it will wait for all the chunks in the buffer to get committed.
+ * Since the buffer is not full, the client write is not blocked.
+ *
+ * @return {@link XceiverClientReply} reply from raft client
+ * @throws IOException in case watchForCommit fails
+ */
+ XceiverClientReply watchOnLastIndex() throws IOException {
+ if (commitIndexMap.isEmpty()) {
+ return null;
+ }
+ return watchForCommit(commitIndexMap.lastKey());
+ }
+
+ /**
+ * Watch for commit for a particular index.
+ *
+ * @param commitIndex log index to watch for
+ * @return minimum commit index replicated to all nodes
+ * @throws IOException IOException in case watch gets timed out
+ */
+ XceiverClientReply watchForCommit(long commitIndex)
+ throws IOException {
+ final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
+ = JavaUtils.memoize(CompletableFuture::new);
+ final CompletableFuture<XceiverClientReply> f =
replies.compute(commitIndex,
+ (key, value) -> value != null ? value : supplier.get());
+ if (!supplier.isInitialized()) {
+ // future already exists
+ return f.join();
+ }
+
+ try {
+ final XceiverClientReply reply = client.watchForCommit(commitIndex);
+ f.complete(reply);
+ final CompletableFuture<XceiverClientReply> removed
+ = replies.remove(commitIndex);
+ Preconditions.checkState(removed == f);
+
+ final long index = reply != null ? reply.getLogIndex() : 0;
+ adjustBuffers(index);
+ return reply;
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ throw getIOExceptionForWatchForCommit(commitIndex, e);
+ } catch (TimeoutException | ExecutionException e) {
+ throw getIOExceptionForWatchForCommit(commitIndex, e);
+ }
+ }
+
+ List<BUFFER> remove(long i) {
+ final List<BUFFER> buffers = commitIndexMap.remove(i);
+ Objects.requireNonNull(buffers, () -> "commitIndexMap.remove(" + i + ")");
+ return buffers;
+ }
+
+ /** Release the buffers for the given index. */
+ abstract void releaseBuffers(long index);
+
+ void adjustBuffers(long commitIndex) {
+ commitIndexMap.keySet().stream()
+ .filter(p -> p <= commitIndex)
+ .forEach(this::releaseBuffers);
+ }
+
+ void releaseBuffersOnException() {
+ adjustBuffers(client.getReplicatedMinCommitIndex());
+ }
+
+ IOException getIOExceptionForWatchForCommit(long commitIndex, Exception e) {
+ LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+ IOException ioException = new IOException(
+ "Unexpected Storage Container Exception: " + e, e);
+ releaseBuffersOnException();
+ return ioException;
+ }
+
+ void cleanup() {
+ commitIndexMap.clear();
+ replies.clear();
+ }
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 67e5e3ca49..95f37eecb2 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -376,8 +376,8 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
checkOpen();
try {
XceiverClientReply reply = bufferFull ?
- commitWatcher.streamWatchOnFirstIndex() :
- commitWatcher.streamWatchOnLastIndex();
+ commitWatcher.watchOnFirstIndex() :
+ commitWatcher.watchOnLastIndex();
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
@@ -454,7 +454,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding index " + asyncReply.getLogIndex() +
" commitMap size "
- + commitWatcher.getCommitInfoMapSize() + " flushLength "
+ + commitWatcher.getCommitIndexMap().size() + " flushLength
"
+ flushPos + " blockID " + blockID);
}
// for standalone protocol, logIndex will always be 0.
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 195921f6fc..3c7f8a2360 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -24,226 +24,57 @@
*/
package org.apache.hadoop.hdds.scm.storage;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* This class executes watchForCommit on ratis pipeline and releases
* buffers once data successfully gets replicated.
*/
-public class CommitWatcher {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(CommitWatcher.class);
-
+class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
// A reference to the pool of buffers holding the data
- private BufferPool bufferPool;
-
- // The map should maintain the keys (logIndexes) in order so that while
- // removing we always end up updating incremented data flushed length.
- // Also, corresponding to the logIndex, the corresponding list of buffers
will
- // be released from the buffer pool.
- private Map<Long, List<ChunkBuffer>> commitIndex2flushedDataMap;
+ private final BufferPool bufferPool;
// future Map to hold up all putBlock futures
- private ConcurrentHashMap<Long,
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
- futureMap;
+ private final ConcurrentMap<Long, CompletableFuture<
+ ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
- private XceiverClientSpi xceiverClient;
-
- // total data which has been successfully flushed and acknowledged
- // by all servers
- private long totalAckDataLength;
-
- public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+ CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+ super(xceiverClient);
this.bufferPool = bufferPool;
- this.xceiverClient = xceiverClient;
- commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
- totalAckDataLength = 0;
- futureMap = new ConcurrentHashMap<>();
- }
-
- /**
- * just update the totalAckDataLength. In case of failure,
- * we will read the data starting from totalAckDataLength.
- */
- private long releaseBuffers(List<Long> indexes) {
- Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
- for (long index : indexes) {
- Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
- final List<ChunkBuffer> buffers
- = commitIndex2flushedDataMap.remove(index);
- long length = buffers.stream().mapToLong(ChunkBuffer::position).sum();
- totalAckDataLength += length;
- // clear the future object from the future Map
- final CompletableFuture<ContainerCommandResponseProto> remove =
- futureMap.remove(totalAckDataLength);
- if (remove == null) {
- LOG.error("Couldn't find required future for " + totalAckDataLength);
- for (Long key : futureMap.keySet()) {
- LOG.error("Existing acknowledged data: " + key);
- }
- }
- Preconditions.checkNotNull(remove);
- for (ChunkBuffer byteBuffer : buffers) {
- bufferPool.releaseBuffer(byteBuffer);
- }
- }
- return totalAckDataLength;
- }
-
- public void updateCommitInfoMap(long index, List<ChunkBuffer> buffers) {
- commitIndex2flushedDataMap.computeIfAbsent(index, k -> new LinkedList<>())
- .addAll(buffers);
- }
-
- int getCommitInfoMapSize() {
- return commitIndex2flushedDataMap.size();
}
- /**
- * Calls watch for commit for the first index in commitIndex2flushedDataMap
to
- * the Ratis client.
- * @return reply reply from raft client
- * @throws IOException in case watchForCommit fails
- */
- public XceiverClientReply watchOnFirstIndex() throws IOException {
- if (!commitIndex2flushedDataMap.isEmpty()) {
- // wait for the first commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long index =
- commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).min()
- .getAsLong();
- if (LOG.isDebugEnabled()) {
- LOG.debug("waiting for first index {} to catch up", index);
- }
- return watchForCommit(index);
- } else {
- return null;
+ @Override
+ void releaseBuffers(long index) {
+ long acked = 0;
+ for (ChunkBuffer buffer : remove(index)) {
+ acked += buffer.position();
+ bufferPool.releaseBuffer(buffer);
}
+ final long totalLength = addAckDataLength(acked);
+ // When putBlock is called, a future is added.
+ // When putBlock is replied, the future is removed below.
+ // Therefore, the removed future should not be null.
+ final CompletableFuture<ContainerCommandResponseProto> removed =
+ futureMap.remove(totalLength);
+ Objects.requireNonNull(removed, () -> "Future not found for "
+ + totalLength + ": existing = " + futureMap.keySet());
}
- /**
- * Calls watch for commit for the first index in commitIndex2flushedDataMap
to
- * the Ratis client.
- * @return reply reply from raft client
- * @throws IOException in case watchForCommit fails
- */
- public XceiverClientReply watchOnLastIndex()
- throws IOException {
- if (!commitIndex2flushedDataMap.isEmpty()) {
- // wait for the commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long index =
- commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).max()
- .getAsLong();
- if (LOG.isDebugEnabled()) {
- LOG.debug("waiting for last flush Index {} to catch up", index);
- }
- return watchForCommit(index);
- } else {
- return null;
- }
- }
-
- private void adjustBuffers(long commitIndex) {
- List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
- .filter(p -> p <= commitIndex).collect(Collectors.toList());
- if (!keyList.isEmpty()) {
- releaseBuffers(keyList);
- }
- }
-
- // It may happen that once the exception is encountered , we still might
- // have successfully flushed up to a certain index. Make sure the buffers
- // only contain data which have not been sufficiently replicated
- void releaseBuffersOnException() {
- adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
- }
-
- /**
- * calls watchForCommit API of the Ratis Client. For Standalone client,
- * it is a no op.
- * @param commitIndex log index to watch for
- * @return minimum commit index replicated to all nodes
- * @throws IOException IOException in case watch gets timed out
- */
- public XceiverClientReply watchForCommit(long commitIndex)
- throws IOException {
- long index;
- try {
- XceiverClientReply reply =
- xceiverClient.watchForCommit(commitIndex);
- if (reply == null) {
- index = 0;
- } else {
- index = reply.getLogIndex();
- }
- adjustBuffers(index);
- return reply;
- } catch (InterruptedException e) {
- // Re-interrupt the thread while catching InterruptedException
- Thread.currentThread().interrupt();
- throw getIOExceptionForWatchForCommit(commitIndex, e);
- } catch (TimeoutException | ExecutionException e) {
- throw getIOExceptionForWatchForCommit(commitIndex, e);
- }
- }
-
- private IOException getIOExceptionForWatchForCommit(long commitIndex,
- Exception e) {
- LOG.warn("watchForCommit failed for index {}", commitIndex, e);
- IOException ioException = new IOException(
- "Unexpected Storage Container Exception: " + e.toString(), e);
- releaseBuffersOnException();
- return ioException;
- }
-
- @VisibleForTesting
- public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
- return commitIndex2flushedDataMap;
- }
-
- public ConcurrentMap<Long,
- CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto>> getFutureMap() {
+ ConcurrentMap<Long, CompletableFuture<
+ ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}
- public long getTotalAckDataLength() {
- return totalAckDataLength;
- }
-
+ @Override
public void cleanup() {
- if (commitIndex2flushedDataMap != null) {
- commitIndex2flushedDataMap.clear();
- }
- if (futureMap != null) {
- futureMap.clear();
- }
- commitIndex2flushedDataMap = null;
+ super.cleanup();
+ futureMap.clear();
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index 92edf2e2c7..ede7057496 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -92,7 +92,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
- return commitWatcher.getCommitIndex2flushedDataMap();
+ return commitWatcher.getCommitIndexMap();
}
@Override
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 8ca70de816..195628ae58 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -18,191 +18,30 @@
package org.apache.hadoop.hdds.scm.storage;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.MemoizedSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
/**
* This class executes watchForCommit on ratis pipeline and releases
* buffers once data successfully gets replicated.
*/
-public class StreamCommitWatcher {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(StreamCommitWatcher.class);
-
- private Map<Long, List<StreamBuffer>> commitIndexMap;
+class StreamCommitWatcher extends AbstractCommitWatcher<StreamBuffer> {
private final List<StreamBuffer> bufferList;
- // total data which has been successfully flushed and acknowledged
- // by all servers
- private long totalAckDataLength;
- private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
- replies = new ConcurrentHashMap<>();
-
- private final XceiverClientSpi xceiverClient;
-
- public StreamCommitWatcher(XceiverClientSpi xceiverClient,
+ StreamCommitWatcher(XceiverClientSpi xceiverClient,
List<StreamBuffer> bufferList) {
- this.xceiverClient = xceiverClient;
- commitIndexMap = new ConcurrentSkipListMap<>();
+ super(xceiverClient);
this.bufferList = bufferList;
- totalAckDataLength = 0;
- }
-
- public void updateCommitInfoMap(long index, List<StreamBuffer> buffers) {
- commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
- .addAll(buffers);
- }
-
- int getCommitInfoMapSize() {
- return commitIndexMap.size();
- }
-
- /**
- * Calls watch for commit for the first index in commitIndex2flushedDataMap
to
- * the Ratis client.
- * @return {@link XceiverClientReply} reply from raft client
- * @throws IOException in case watchForCommit fails
- */
- public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
- if (!commitIndexMap.isEmpty()) {
- // wait for the first commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long index =
- commitIndexMap.keySet().stream().mapToLong(v -> v).min()
- .getAsLong();
- if (LOG.isDebugEnabled()) {
- LOG.debug("waiting for first index {} to catch up", index);
- }
- return streamWatchForCommit(index);
- } else {
- return null;
- }
- }
-
- /**
- * Calls watch for commit for the last index in commitIndex2flushedDataMap to
- * the Ratis client.
- * @return {@link XceiverClientReply} reply from raft client
- * @throws IOException in case watchForCommit fails
- */
- public XceiverClientReply streamWatchOnLastIndex()
- throws IOException {
- if (!commitIndexMap.isEmpty()) {
- // wait for the commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long index =
- commitIndexMap.keySet().stream().mapToLong(v -> v).max()
- .getAsLong();
- if (LOG.isDebugEnabled()) {
- LOG.debug("waiting for last flush Index {} to catch up", index);
- }
- return streamWatchForCommit(index);
- } else {
- return null;
- }
- }
-
- /**
- * calls watchForCommit API of the Ratis Client. This method is for streaming
- * and no longer requires releaseBuffers
- * @param commitIndex log index to watch for
- * @return minimum commit index replicated to all nodes
- * @throws IOException IOException in case watch gets timed out
- */
- public XceiverClientReply streamWatchForCommit(long commitIndex)
- throws IOException {
- final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
- = JavaUtils.memoize(CompletableFuture::new);
- final CompletableFuture<XceiverClientReply> f =
replies.compute(commitIndex,
- (key, value) -> value != null ? value : supplier.get());
- if (!supplier.isInitialized()) {
- // future already exists
- return f.join();
- }
-
- try {
- XceiverClientReply reply =
- xceiverClient.watchForCommit(commitIndex);
- f.complete(reply);
- final CompletableFuture<XceiverClientReply> removed
- = replies.remove(commitIndex);
- Preconditions.checkState(removed == f);
-
- adjustBuffers(reply.getLogIndex());
- return reply;
- } catch (InterruptedException e) {
- // Re-interrupt the thread while catching InterruptedException
- Thread.currentThread().interrupt();
- throw getIOExceptionForWatchForCommit(commitIndex, e);
- } catch (TimeoutException | ExecutionException e) {
- throw getIOExceptionForWatchForCommit(commitIndex, e);
- }
- }
-
- void releaseBuffersOnException() {
- adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
- }
-
- private void adjustBuffers(long commitIndex) {
- List<Long> keyList = commitIndexMap.keySet().stream()
- .filter(p -> p <= commitIndex).collect(Collectors.toList());
- if (!keyList.isEmpty()) {
- releaseBuffers(keyList);
- }
- }
-
- private long releaseBuffers(List<Long> indexes) {
- Preconditions.checkArgument(!commitIndexMap.isEmpty());
- for (long index : indexes) {
- Preconditions.checkState(commitIndexMap.containsKey(index));
- final List<StreamBuffer> buffers = commitIndexMap.remove(index);
- final long length =
- buffers.stream().mapToLong(StreamBuffer::position).sum();
- totalAckDataLength += length;
- for (StreamBuffer byteBuffer : buffers) {
- bufferList.remove(byteBuffer);
- }
- }
- return totalAckDataLength;
- }
-
- public long getTotalAckDataLength() {
- return totalAckDataLength;
- }
-
- private IOException getIOExceptionForWatchForCommit(long commitIndex,
- Exception e) {
- LOG.warn("watchForCommit failed for index {}", commitIndex, e);
- IOException ioException = new IOException(
- "Unexpected Storage Container Exception: " + e.toString(), e);
- releaseBuffersOnException();
- return ioException;
}
- public void cleanup() {
- if (commitIndexMap != null) {
- commitIndexMap.clear();
+ @Override
+ void releaseBuffers(long index) {
+ long acked = 0;
+ for (StreamBuffer buffer : remove(index)) {
+ acked += buffer.position();
+ bufferList.remove(buffer);
}
- commitIndexMap = null;
+ addAckDataLength(acked);
}
}
diff --git a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
index c61a14559b..9436840c9c 100644
--- a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
@@ -77,7 +77,7 @@
<Bug pattern="RV_RETURN_VALUE_IGNORED" />
</Match>
<Match>
- <Class name="org.apache.hadoop.ozone.client.rpc.TestCommitWatcher"/>
+ <Class name="org.apache.hadoop.hdds.scm.storage.TestCommitWatcher"/>
<Bug pattern="URF_UNREAD_FIELD" />
</Match>
<Match>
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
similarity index 95%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
index ade3d9d64a..5caf23936a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
@@ -15,7 +15,7 @@
* the License.
*/
-package org.apache.hadoop.ozone.client.rpc;
+package org.apache.hadoop.hdds.scm.storage;
import java.io.IOException;
import java.time.Duration;
@@ -44,8 +44,6 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.hdds.scm.storage.CommitWatcher;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -230,19 +228,19 @@ public class TestCommitWatcher {
future2.get();
assertEquals(future2, watcher.getFutureMap().get((long) 2 *
chunkSize));
assertEquals(2, watcher.
- getCommitIndex2flushedDataMap().size());
+ getCommitIndexMap().size());
watcher.watchOnFirstIndex();
- assertFalse(watcher.getCommitIndex2flushedDataMap()
+ assertFalse(watcher.getCommitIndexMap()
.containsKey(replies.get(0).getLogIndex()));
assertFalse(watcher.getFutureMap().containsKey((long) chunkSize));
assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
watcher.watchOnLastIndex();
- assertFalse(watcher.getCommitIndex2flushedDataMap()
+ assertFalse(watcher.getCommitIndexMap()
.containsKey(replies.get(1).getLogIndex()));
assertFalse(watcher.getFutureMap().containsKey((long) 2 * chunkSize));
assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
assertTrue(watcher.getFutureMap().isEmpty());
- assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+ assertTrue(watcher.getCommitIndexMap().isEmpty());
}
}
}
@@ -303,9 +301,9 @@ public class TestCommitWatcher {
// wait on 2nd putBlock to complete
future2.get();
assertEquals(future2, watcher.getFutureMap().get((long) 2 *
chunkSize));
- assertEquals(2, watcher.getCommitIndex2flushedDataMap().size());
+ assertEquals(2, watcher.getCommitIndexMap().size());
watcher.watchOnFirstIndex();
- assertFalse(watcher.getCommitIndex2flushedDataMap()
+ assertFalse(watcher.getCommitIndexMap()
.containsKey(replies.get(0).getLogIndex()));
assertFalse(watcher.getFutureMap().containsKey((long) chunkSize));
assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
@@ -333,12 +331,12 @@ public class TestCommitWatcher {
if (ratisClient.getReplicatedMinCommitIndex() < replies.get(1)
.getLogIndex()) {
assertEquals(chunkSize, watcher.getTotalAckDataLength());
- assertEquals(1, watcher.getCommitIndex2flushedDataMap().size());
+ assertEquals(1, watcher.getCommitIndexMap().size());
assertEquals(1, watcher.getFutureMap().size());
} else {
assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
assertTrue(watcher.getFutureMap().isEmpty());
- assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+ assertTrue(watcher.getCommitIndexMap().isEmpty());
}
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 573fe8614b..1b62939a54 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -224,7 +224,8 @@ public class TestBlockOutputStream {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -309,7 +310,8 @@ public class TestBlockOutputStream {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(pendingWriteChunkCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, metrics
@@ -405,7 +407,8 @@ public class TestBlockOutputStream {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(pendingWriteChunkCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, metrics
@@ -492,7 +495,8 @@ public class TestBlockOutputStream {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -593,7 +597,8 @@ public class TestBlockOutputStream {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -697,7 +702,8 @@ public class TestBlockOutputStream {
Assert.assertEquals(totalOpCount + 9,
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index d858b65a92..ef50f867b4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -222,7 +222,8 @@ public class TestBlockOutputStreamFlushDelay {
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(totalWriteDataLength,
blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -307,7 +308,8 @@ public class TestBlockOutputStreamFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(pendingWriteChunkCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, metrics
@@ -403,7 +405,8 @@ public class TestBlockOutputStreamFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(pendingWriteChunkCount, metrics
.getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, metrics
@@ -490,7 +493,8 @@ public class TestBlockOutputStreamFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -591,7 +595,8 @@ public class TestBlockOutputStreamFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
@@ -695,7 +700,8 @@ public class TestBlockOutputStreamFlushDelay {
Assert.assertEquals(totalOpCount + 8,
metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 25fd2aedfa..3aef0a0dbd 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -247,7 +247,8 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -333,7 +334,8 @@ public class TestBlockOutputStreamWithFailures {
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -432,15 +434,12 @@ public class TestBlockOutputStreamWithFailures {
// now close the stream, It will update the ack length after watchForCommit
key.close();
- Assert
- .assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
validateData(keyName, data1);
}
@@ -501,7 +500,8 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -571,7 +571,8 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -661,7 +662,8 @@ public class TestBlockOutputStreamWithFailures {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -752,7 +754,8 @@ public class TestBlockOutputStreamWithFailures {
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
@@ -846,7 +849,8 @@ public class TestBlockOutputStreamWithFailures {
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index 9f1df6207e..937b170ee2 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -247,7 +247,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -335,7 +336,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -434,15 +436,12 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
// now close the stream, It will update the ack length after watchForCommit
key.close();
- Assert
- .assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
validateData(keyName, data1);
}
@@ -503,7 +502,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -573,7 +573,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -663,7 +664,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -755,7 +757,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
@@ -849,7 +852,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 070a4c47d8..b52b10ed53 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -236,17 +236,13 @@ public class TestWatchForCommit {
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
key.close();
- Assert
- .assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert
- .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert
.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
- Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+ Assert.assertTrue(
+ blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
validateData(keyName, data1);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]