This is an automated email from the ASF dual-hosted git repository. shashikant pushed a commit to branch ozone-0.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/ozone-0.4 by this push: new 8614da3 HDDS-1257. Incorrect object because of mismatch in block lengths. Contributed by Shashikant Banerjee. 8614da3 is described below commit 8614da3723f3470a13043fc8ffd87d4c27b052de Author: Shashikant Banerjee <shashik...@apache.org> AuthorDate: Thu Mar 14 19:32:36 2019 +0530 HDDS-1257. Incorrect object because of mismatch in block lengths. Contributed by Shashikant Banerjee. (cherry picked from commit d60673c47077d69320ae1bd37c6b74489bef25f7) --- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 38 ++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index fe41f57..13913ee 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -46,6 +46,7 @@ import java.util.UUID; import java.util.List; import java.util.ArrayList; import java.util.concurrent.*; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls .putBlockAsync; @@ -108,7 +109,10 @@ public class BlockOutputStream extends OutputStream { CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> futureMap; // map containing mapping for putBlock logIndex to to flushedDataLength Map. - private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap; + + // The map should maintain the keys (logIndexes) in order so that while + // removing we always end up updating incremented data flushed length. + private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap; private List<DatanodeDetails> failedServers; @@ -157,7 +161,7 @@ public class BlockOutputStream extends OutputStream { // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitIndex2flushedDataMap = new ConcurrentHashMap<>(); + commitIndex2flushedDataMap = new ConcurrentSkipListMap<>(); totalAckDataLength = 0; futureMap = new ConcurrentHashMap<>(); totalDataFlushedLength = 0; @@ -206,7 +210,7 @@ public class BlockOutputStream extends OutputStream { int writeLen; // Allocate a buffer if needed. The buffer will be allocated only - // once as needed and will be reused again for mutiple blockOutputStream + // once as needed and will be reused again for multiple blockOutputStream // entries. ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded(); int pos = currentBuffer.position(); @@ -281,10 +285,18 @@ public class BlockOutputStream extends OutputStream { * just update the totalAckDataLength. In case of failure, * we will read the data starting from totalAckDataLength. */ - private void updateFlushIndex(long index) { - if (!commitIndex2flushedDataMap.isEmpty()) { + private void updateFlushIndex(List<Long> indexes) { + Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty()); + for (long index : indexes) { Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index)); - totalAckDataLength = commitIndex2flushedDataMap.remove(index); + long length = commitIndex2flushedDataMap.remove(index); + + // totalAckDataLength replicated yet should always be less than equal to + // the current length being returned from commitIndex2flushedDataMap. + // The below precondition would ensure commitIndex2flushedDataMap entries + // are removed in order of the insertion to the map. + Preconditions.checkArgument(totalAckDataLength < length); + totalAckDataLength = length; LOG.debug("Total data successfully replicated: " + totalAckDataLength); futureMap.remove(totalAckDataLength); // Flush has been committed to required servers successful. @@ -325,13 +337,13 @@ public class BlockOutputStream extends OutputStream { } private void adjustBuffers(long commitIndex) { - commitIndex2flushedDataMap.keySet().stream().forEach(index -> { - if (index <= commitIndex) { - updateFlushIndex(index); - } else { - return; - } - }); + List<Long> keyList = commitIndex2flushedDataMap.keySet().stream() + .filter(p -> p <= commitIndex).collect(Collectors.toList()); + if (keyList.isEmpty()) { + return; + } else { + updateFlushIndex(keyList); + } } // It may happen that once the exception is encountered , we still might --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org