adoroszlai commented on code in PR #6779:
URL: https://github.com/apache/ozone/pull/6779#discussion_r1638144398


##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java:
##########
@@ -183,53 +206,54 @@ static <T> T tryEachDatanode(Pipeline pipeline,
    *
    * @param xceiverClient client to perform call
    * @param validators functions to validate the response
-   * @param datanodeBlockID blockID to identify container
+   * @param blockID blockID to identify container
    * @param token a token for this block (may be null)
    * @return container protocol get block response
    * @throws IOException if there is an I/O error while performing the call
    */
   public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
-      List<Validator> validators,
-      DatanodeBlockID datanodeBlockID,
-      Token<? extends TokenIdentifier> token) throws IOException {
-    GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
-        .newBuilder()
-        .setBlockID(datanodeBlockID);
-    ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
-        .newBuilder()
+      List<Validator> validators, BlockID blockID, Token<? extends 
TokenIdentifier> token,
+      Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
+    ContainerCommandRequestProto.Builder builder = 
getContainerCommandRequestProtoBuilder()
         .setCmdType(Type.GetBlock)
-        .setContainerID(datanodeBlockID.getContainerID())
-        .setGetBlock(readBlockRequest);
+        .setContainerID(blockID.getContainerID());
     if (token != null) {
       builder.setEncodedToken(token.encodeToUrlString());
     }
 
     return tryEachDatanode(xceiverClient.getPipeline(),
-        d -> getBlock(xceiverClient, validators, builder, d),
-        d -> toErrorMessage(datanodeBlockID, d));
+        d -> getBlock(xceiverClient, validators, builder, blockID, d, 
replicaIndexes),
+        d -> toErrorMessage(blockID, d));
   }
 
-  static String toErrorMessage(DatanodeBlockID blockId, DatanodeDetails d) {
+  static String toErrorMessage(BlockID blockId, DatanodeDetails d) {
     return String.format("Failed to get block #%s in container #%s from %s",
         blockId.getLocalID(), blockId.getContainerID(), d);
   }
 
   public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
-      DatanodeBlockID datanodeBlockID,
-      Token<? extends TokenIdentifier> token) throws IOException {
-    return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token);
+      BlockID datanodeBlockID, Token<? extends TokenIdentifier> token,
+      Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
+    return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token, 
replicaIndexes);
   }
 
-  private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
-      List<Validator> validators,
-      ContainerCommandRequestProto.Builder builder,
-      DatanodeDetails datanode) throws IOException {
+  private static GetBlockResponseProto getBlock(XceiverClientSpi 
xceiverClient, List<Validator> validators,
+      ContainerCommandRequestProto.Builder builder, BlockID blockID, 
DatanodeDetails datanode,
+      Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
     String traceId = TracingUtil.exportCurrentSpan();
     if (traceId != null) {
       builder.setTraceID(traceId);
     }
+    final DatanodeBlockID.Builder datanodeBlockID = 
blockID.getDatanodeBlockIDProtobufBuilder();
+    int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);

Review Comment:
   Can we get the replica index from the pipeline here, instead of requiring a 
map to be passed?
   
   ```suggestion
       int replicaIndex = xceiverClient.getPipeline().getReplicaIndex(datanode);
   ```



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java:
##########
@@ -244,33 +246,28 @@ protected List<ChunkInfo> getChunkInfoList() throws 
IOException {
 
   @VisibleForTesting
   protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
-    final Pipeline pipeline = xceiverClient.getPipeline();
-
+    Pipeline pipeline = pipelineRef.get();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Initializing BlockInputStream for get key to access {}",
-          blockID.getContainerID());
-    }
-
-    DatanodeBlockID.Builder blkIDBuilder =
-        DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
-            .setLocalID(blockID.getLocalID())
-            .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
-
-    int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
-    if (replicaIndex > 0) {
-      blkIDBuilder.setReplicaIndex(replicaIndex);
+      LOG.debug("Initializing BlockInputStream for get key to access {} with 
pipeline {}.",
+          blockID.getContainerID(), pipeline);
     }
 
     GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
-        xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
+        xceiverClient, VALIDATORS, blockID, tokenRef.get(), 
pipeline.getReplicaIndexes());
 
     return response.getBlockData().getChunksList();
   }
 
-  private void setPipeline(Pipeline pipeline) {
+  private void setPipeline(Pipeline pipeline) throws IOException {
     if (pipeline == null) {
       return;
     }
+    Set<Integer> replicaIndexes =
+        
pipeline.getNodes().stream().map(pipeline::getReplicaIndex).collect(Collectors.toSet());
+    if (replicaIndexes.size() > 1) {

Review Comment:
   ```suggestion
       long replicaIndexes = pipeline.getNodes().stream()
           .mapToInt(pipeline::getReplicaIndex)
           .distinct().count();
       if (replicaIndexes > 1) {
   ```



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java:
##########
@@ -186,4 +233,205 @@ private static List<OmKeyLocationInfo> 
lookupKey(MiniOzoneCluster cluster)
     return locations.getLocationList();
   }
 
+  private static OmKeyLocationInfo lookupKeyFirstLocation(MiniOzoneCluster 
cluster)
+      throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(VOLUME)
+        .setBucketName(BUCKET)
+        .setKeyName(KEY)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
+    Assertions.assertNotNull(locations);
+    return locations.getLocationList().get(0);
+  }
+
+
+  public void assertState(MiniOzoneCluster cluster, Map<Integer, 
DatanodeDetails> expectedReplicaMap)
+      throws IOException {
+    OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster);
+    Map<Integer, DatanodeDetails> replicaMap =
+        keyLocation.getPipeline().getNodes().stream().collect(Collectors.toMap(
+            dn -> keyLocation.getPipeline().getReplicaIndex(dn), 
Functions.identity()));
+    Assertions.assertEquals(expectedReplicaMap, replicaMap);
+  }
+
+  private OzoneInputStream createInputStream(OzoneClient client) throws 
IOException {
+    ObjectStore objectStore = client.getObjectStore();
+    OzoneVolume volume = objectStore.getVolume(VOLUME);
+    OzoneBucket bucket = volume.getBucket(BUCKET);
+    return bucket.readKey(KEY);
+  }
+
+  private void mockContainerPlacementPolicy(final 
MockedStatic<ContainerPlacementPolicyFactory> mockedPlacementFactory,
+                                            final 
AtomicReference<DatanodeDetails> mockedDatanodeToRemove) {
+    mockedPlacementFactory.when(() -> 
ContainerPlacementPolicyFactory.getECPolicy(any(ConfigurationSource.class),
+        any(NodeManager.class), any(NetworkTopology.class), 
Mockito.anyBoolean(),
+        any(SCMContainerPlacementMetrics.class))).thenAnswer(i -> {
+          PlacementPolicy placementPolicy = (PlacementPolicy) 
Mockito.spy(i.callRealMethod());
+          Mockito.doAnswer(args -> {
+            Set<ContainerReplica> containerReplica = ((Set<ContainerReplica>) 
args.getArgument(0)).stream()
+                .filter(r -> 
r.getDatanodeDetails().equals(mockedDatanodeToRemove.get()))
+                .collect(Collectors.toSet());
+            return containerReplica;
+          
}).when(placementPolicy).replicasToRemoveToFixOverreplication(Mockito.anySet(), 
Mockito.anyInt());
+          return placementPolicy;
+        });
+  }
+
+  private void mockContainerProtocolCalls(final 
MockedStatic<ContainerProtocolCalls> mockedContainerProtocolCalls,
+                                          final Map<Integer, Integer> 
failedReadChunkCountMap) {
+    mockedContainerProtocolCalls.when(() -> 
ContainerProtocolCalls.readChunk(any(), any(), any(), anyList(), any()))
+        .thenAnswer(invocation -> {
+          int replicaIndex = ((ContainerProtos.DatanodeBlockID) 
invocation.getArgument(2)).getReplicaIndex();
+          try {
+            return invocation.callRealMethod();
+          } catch (Throwable e) {
+            failedReadChunkCountMap.compute(replicaIndex,
+                (replicaIdx, totalCount) -> totalCount == null ? 1 : 
(totalCount + 1));
+            throw e;
+          }
+        });
+  }
+
+
+  @Test
+  public void testECContainerReplication() throws Exception {

Review Comment:
   `TestContainerReplication` execution time increased significantly:
   
   ```
   Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 327.089 s - 
in org.apache.hadoop.ozone.container.TestContainerReplication
   ```
   
   vs.
   
   ```
   Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 188.36 s - 
in org.apache.hadoop.ozone.container.TestContainerReplication
   ```



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java:
##########
@@ -204,7 +205,7 @@ private DataStreamOutput setupStream(Pipeline pipeline) 
throws IOException {
     //  it or remove it completely if possible
     String id = pipeline.getFirstNode().getUuidString();
     ContainerProtos.ContainerCommandRequestProto.Builder builder =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+        getContainerCommandRequestProtoBuilder()

Review Comment:
   The patch is huge (122K), hard to review.  Size could be reduced by 
prefactoring:
   - introduce this factory method
   - replace all calls of `newBuilder()`
   
   in a separate patch, without any functional changes, before the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to