This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new 8614da3  HDDS-1257. Incorrect object because of mismatch in block 
lengths. Contributed by Shashikant Banerjee.
8614da3 is described below

commit 8614da3723f3470a13043fc8ffd87d4c27b052de
Author: Shashikant Banerjee <shashik...@apache.org>
AuthorDate: Thu Mar 14 19:32:36 2019 +0530

    HDDS-1257. Incorrect object because of mismatch in block lengths. 
Contributed by Shashikant Banerjee.
    
    (cherry picked from commit d60673c47077d69320ae1bd37c6b74489bef25f7)
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 38 ++++++++++++++--------
 1 file changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index fe41f57..13913ee 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -46,6 +46,7 @@ import java.util.UUID;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.concurrent.*;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
     .putBlockAsync;
@@ -108,7 +109,10 @@ public class BlockOutputStream extends OutputStream {
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
       futureMap;
   // map containing mapping for putBlock logIndex to to flushedDataLength Map.
-  private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
+
+  // The map should maintain the keys (logIndexes) in order so that while
+  // removing we always end up updating incremented data flushed length.
+  private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap;
 
   private List<DatanodeDetails> failedServers;
 
@@ -157,7 +161,7 @@ public class BlockOutputStream extends OutputStream {
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitIndex2flushedDataMap = new ConcurrentHashMap<>();
+    commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
     totalAckDataLength = 0;
     futureMap = new ConcurrentHashMap<>();
     totalDataFlushedLength = 0;
@@ -206,7 +210,7 @@ public class BlockOutputStream extends OutputStream {
       int writeLen;
 
       // Allocate a buffer if needed. The buffer will be allocated only
-      // once as needed and will be reused again for mutiple blockOutputStream
+      // once as needed and will be reused again for multiple blockOutputStream
       // entries.
       ByteBuffer  currentBuffer = bufferPool.allocateBufferIfNeeded();
       int pos = currentBuffer.position();
@@ -281,10 +285,18 @@ public class BlockOutputStream extends OutputStream {
    * just update the totalAckDataLength. In case of failure,
    * we will read the data starting from totalAckDataLength.
    */
-  private void updateFlushIndex(long index) {
-    if (!commitIndex2flushedDataMap.isEmpty()) {
+  private void updateFlushIndex(List<Long> indexes) {
+    Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
+    for (long index : indexes) {
       Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
-      totalAckDataLength = commitIndex2flushedDataMap.remove(index);
+      long length = commitIndex2flushedDataMap.remove(index);
+
+      // totalAckDataLength replicated yet should always be less than equal to
+      // the current length being returned from commitIndex2flushedDataMap.
+      // The below precondition would ensure commitIndex2flushedDataMap entries
+      // are removed in order of the insertion to the map.
+      Preconditions.checkArgument(totalAckDataLength < length);
+      totalAckDataLength = length;
       LOG.debug("Total data successfully replicated: " + totalAckDataLength);
       futureMap.remove(totalAckDataLength);
       // Flush has been committed to required servers successful.
@@ -325,13 +337,13 @@ public class BlockOutputStream extends OutputStream {
   }
 
   private void adjustBuffers(long commitIndex) {
-    commitIndex2flushedDataMap.keySet().stream().forEach(index -> {
-      if (index <= commitIndex) {
-        updateFlushIndex(index);
-      } else {
-        return;
-      }
-    });
+    List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
+        .filter(p -> p <= commitIndex).collect(Collectors.toList());
+    if (keyList.isEmpty()) {
+      return;
+    } else {
+      updateFlushIndex(keyList);
+    }
   }
 
   // It may happen that once the exception is encountered , we still might


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to