This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch ozone-1.4
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit b58be97410880254fe358a722590253dd37020d6
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Fri Apr 12 09:44:18 2024 +0100

    HDDS-10681. EC Reconstruction does not issue put block to data index if it 
is unused (#6514)
    
    (cherry picked from commit cba8c85e22435e2c3e6aaa0e66aedc2a0631c912)
    
     Conflicts:
            
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
---
 .../ECReconstructionCoordinator.java               | 119 ++++++++++-----------
 .../hdds/scm/storage/TestContainerCommandsEC.java  |  18 +++-
 2 files changed, 72 insertions(+), 65 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 90756bbc88..3d9b5b77ee 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -58,6 +58,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -247,24 +248,15 @@ public class ECReconstructionCoordinator implements 
Closeable {
     int dataLocs = ECBlockInputStreamProxy
         .expectedDataLocations(repConfig, safeBlockGroupLength);
     List<Integer> toReconstructIndexes = new ArrayList<>();
+    List<Integer> notReconstructIndexes = new ArrayList<>();
     for (Integer index : missingContainerIndexes) {
       if (index <= dataLocs || index > repConfig.getData()) {
         toReconstructIndexes.add(index);
+      } else {
+        // Don't need to be reconstructed, but we do need a stream to write
+        // the block data to.
+        notReconstructIndexes.add(index);
       }
-      // else padded indexes.
-    }
-
-    // Looks like we don't need to reconstruct any missing blocks in this block
-    // group. The reason for this should be block group had only padding blocks
-    // in the missing locations.
-    if (toReconstructIndexes.size() == 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping the reconstruction for the block: "
-            + blockLocationInfo.getBlockID() + ". In the missing locations: "
-            + missingContainerIndexes
-            + ", this block group has only padded blocks.");
-      }
-      return;
     }
 
     try (ECBlockReconstructedStripeInputStream sis
@@ -276,71 +268,78 @@ public class ECReconstructionCoordinator implements 
Closeable {
 
       ECBlockOutputStream[] targetBlockStreams =
           new ECBlockOutputStream[toReconstructIndexes.size()];
+      ECBlockOutputStream[] emptyBlockStreams =
+          new ECBlockOutputStream[notReconstructIndexes.size()];
       ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
       try {
+        // Create streams and buffers for all indexes that need reconstructed
         for (int i = 0; i < toReconstructIndexes.size(); i++) {
           int replicaIndex = toReconstructIndexes.get(i);
-          DatanodeDetails datanodeDetails =
-              targetMap.get(replicaIndex);
-          targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo,
-              datanodeDetails, repConfig, replicaIndex
-          );
+          DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
+          targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, 
datanodeDetails, repConfig, replicaIndex);
           bufs[i] = byteBufferPool.getBuffer(false, 
repConfig.getEcChunkSize());
-          // Make sure it's clean. Don't want to reuse the erroneously returned
-          // buffers from the pool.
           bufs[i].clear();
         }
+        // Then create a stream for all indexes that don't need reconstructed, 
but still need a stream to
+        // write the empty block data to.
+        for (int i = 0; i < notReconstructIndexes.size(); i++) {
+          int replicaIndex = notReconstructIndexes.get(i);
+          DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
+          emptyBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, 
datanodeDetails, repConfig, replicaIndex);
+        }
 
-        sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
-            .collect(Collectors.toSet()));
-        long length = safeBlockGroupLength;
-        while (length > 0) {
-          int readLen;
-          try {
-            readLen = sis.recoverChunks(bufs);
-            Set<Integer> failedIndexes = sis.getFailedIndexes();
-            if (!failedIndexes.isEmpty()) {
-              // There was a problem reading some of the block indexes, but we
-              // did not get an exception as there must have been spare indexes
-              // to try and recover from. Therefore we should log out the block
-              // group details in the same way as for the exception case below.
+        if (toReconstructIndexes.size() > 0) {
+          sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 
1))
+              .collect(Collectors.toSet()));
+          long length = safeBlockGroupLength;
+          while (length > 0) {
+            int readLen;
+            try {
+              readLen = sis.recoverChunks(bufs);
+              Set<Integer> failedIndexes = sis.getFailedIndexes();
+              if (!failedIndexes.isEmpty()) {
+                // There was a problem reading some of the block indexes, but 
we
+                // did not get an exception as there must have been spare 
indexes
+                // to try and recover from. Therefore we should log out the 
block
+                // group details in the same way as for the exception case 
below.
+                logBlockGroupDetails(blockLocationInfo, repConfig,
+                    blockDataGroup);
+              }
+            } catch (IOException e) {
+              // When we see exceptions here, it could be due to some transient
+              // issue that causes the block read to fail when reconstructing 
it,
+              // but we have seen issues where the containers don't have the
+              // blocks they appear they should have, or the block chunks are 
the
+              // wrong length etc. In order to debug these sort of cases, if we
+              // get an error, we will log out the details about the block 
group
+              // length on each source, along with their chunk list and chunk
+              // lengths etc.
               logBlockGroupDetails(blockLocationInfo, repConfig,
                   blockDataGroup);
+              throw e;
             }
-          } catch (IOException e) {
-            // When we see exceptions here, it could be due to some transient
-            // issue that causes the block read to fail when reconstructing it,
-            // but we have seen issues where the containers don't have the
-            // blocks they appear they should have, or the block chunks are the
-            // wrong length etc. In order to debug these sort of cases, if we
-            // get an error, we will log out the details about the block group
-            // length on each source, along with their chunk list and chunk
-            // lengths etc.
-            logBlockGroupDetails(blockLocationInfo, repConfig,
-                blockDataGroup);
-            throw e;
-          }
-          // TODO: can be submitted in parallel
-          for (int i = 0; i < bufs.length; i++) {
-            CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-                future = targetBlockStreams[i].write(bufs[i]);
-            checkFailures(targetBlockStreams[i], future);
-            bufs[i].clear();
+            // TODO: can be submitted in parallel
+            for (int i = 0; i < bufs.length; i++) {
+              CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+                  future = targetBlockStreams[i].write(bufs[i]);
+              checkFailures(targetBlockStreams[i], future);
+              bufs[i].clear();
+            }
+            length -= readLen;
           }
-          length -= readLen;
         }
-
-        for (ECBlockOutputStream targetStream : targetBlockStreams) {
-          targetStream.executePutBlock(true, true,
-              blockLocationInfo.getLength(), blockDataGroup);
-          checkFailures(targetStream,
-              targetStream.getCurrentPutBlkResponseFuture());
+        List<ECBlockOutputStream> allStreams = new 
ArrayList<>(Arrays.asList(targetBlockStreams));
+        allStreams.addAll(Arrays.asList(emptyBlockStreams));
+        for (ECBlockOutputStream targetStream : allStreams) {
+          targetStream.executePutBlock(true, true, 
blockLocationInfo.getLength(), blockDataGroup);
+          checkFailures(targetStream, 
targetStream.getCurrentPutBlkResponseFuture());
         }
       } finally {
         for (ByteBuffer buf : bufs) {
           byteBufferPool.putBuffer(buf);
         }
         IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
+        IOUtils.cleanupWithLogger(LOG, emptyBlockStreams);
       }
     }
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index f2fe3fa31a..5cd4ce63f8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -614,8 +614,15 @@ public class TestContainerCommandsEC {
     testECReconstructionCoordinator(missingIndexes, 3);
   }
 
+  @ParameterizedTest
+  @MethodSource("recoverableMissingIndexes")
+  void testECReconstructionCoordinatorWithPartialStripe(List<Integer> 
missingIndexes)
+      throws Exception {
+    testECReconstructionCoordinator(missingIndexes, 1);
+  }
+
   @Test
-  void testECReconstructionWithPartialStripe()
+  void testECReconstructParityWithPartialStripe()
           throws Exception {
     testECReconstructionCoordinator(ImmutableList.of(4, 5), 1);
   }
@@ -895,18 +902,19 @@ public class TestContainerCommandsEC {
           reconstructedBlockData) {
 
     for (int i = 0; i < blockData.length; i++) {
+      Assert.assertEquals(blockData[i].getBlockID(), 
reconstructedBlockData[i].getBlockID());
+      Assert.assertEquals(blockData[i].getSize(), 
reconstructedBlockData[i].getSize());
+      Assert.assertEquals(blockData[i].getMetadata(), 
reconstructedBlockData[i].getMetadata());
       List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
           blockData[i].getChunks();
       List<ContainerProtos.ChunkInfo> newBlockDataChunks =
           reconstructedBlockData[i].getChunks();
       for (int j = 0; j < oldBlockDataChunks.size(); j++) {
         ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
-        if (chunkInfo.getLen() == 0) {
-          // let's ignore the empty chunks
-          continue;
-        }
         Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j));
       }
+      // Ensure there are no extra chunks in the reconstructed block
+      Assert.assertEquals(oldBlockDataChunks.size(), 
newBlockDataChunks.size());
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to