adoroszlai commented on code in PR #6779: URL: https://github.com/apache/ozone/pull/6779#discussion_r1638144398
########## hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java: ########## @@ -183,53 +206,54 @@ static <T> T tryEachDatanode(Pipeline pipeline, * * @param xceiverClient client to perform call * @param validators functions to validate the response - * @param datanodeBlockID blockID to identify container + * @param blockID blockID to identify container * @param token a token for this block (may be null) * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - List<Validator> validators, - DatanodeBlockID datanodeBlockID, - Token<? extends TokenIdentifier> token) throws IOException { - GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto - .newBuilder() - .setBlockID(datanodeBlockID); - ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto - .newBuilder() + List<Validator> validators, BlockID blockID, Token<? extends TokenIdentifier> token, + Map<DatanodeDetails, Integer> replicaIndexes) throws IOException { + ContainerCommandRequestProto.Builder builder = getContainerCommandRequestProtoBuilder() .setCmdType(Type.GetBlock) - .setContainerID(datanodeBlockID.getContainerID()) - .setGetBlock(readBlockRequest); + .setContainerID(blockID.getContainerID()); if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } return tryEachDatanode(xceiverClient.getPipeline(), - d -> getBlock(xceiverClient, validators, builder, d), - d -> toErrorMessage(datanodeBlockID, d)); + d -> getBlock(xceiverClient, validators, builder, blockID, d, replicaIndexes), + d -> toErrorMessage(blockID, d)); } - static String toErrorMessage(DatanodeBlockID blockId, DatanodeDetails d) { + static String toErrorMessage(BlockID blockId, DatanodeDetails d) { return String.format("Failed to get block #%s in container #%s from %s", blockId.getLocalID(), blockId.getContainerID(), d); } public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID, - Token<? extends TokenIdentifier> token) throws IOException { - return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token); + BlockID datanodeBlockID, Token<? extends TokenIdentifier> token, + Map<DatanodeDetails, Integer> replicaIndexes) throws IOException { + return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token, replicaIndexes); } - private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - List<Validator> validators, - ContainerCommandRequestProto.Builder builder, - DatanodeDetails datanode) throws IOException { + private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List<Validator> validators, + ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode, + Map<DatanodeDetails, Integer> replicaIndexes) throws IOException { String traceId = TracingUtil.exportCurrentSpan(); if (traceId != null) { builder.setTraceID(traceId); } + final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); + int replicaIndex = replicaIndexes.getOrDefault(datanode, 0); Review Comment: Can we get the replica index from the pipeline here, instead of requiring a map to be passed? ```suggestion int replicaIndex = xceiverClient.getPipeline().getReplicaIndex(datanode); ``` ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java: ########## @@ -244,33 +246,28 @@ protected List<ChunkInfo> getChunkInfoList() throws IOException { @VisibleForTesting protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException { - final Pipeline pipeline = xceiverClient.getPipeline(); - + Pipeline pipeline = pipelineRef.get(); if (LOG.isDebugEnabled()) { - LOG.debug("Initializing BlockInputStream for get key to access {}", - blockID.getContainerID()); - } - - DatanodeBlockID.Builder blkIDBuilder = - DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID()) - .setLocalID(blockID.getLocalID()) - .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()); - - int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); - if (replicaIndex > 0) { - blkIDBuilder.setReplicaIndex(replicaIndex); + LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.", + blockID.getContainerID(), pipeline); } GetBlockResponseProto response = ContainerProtocolCalls.getBlock( - xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get()); + xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes()); return response.getBlockData().getChunksList(); } - private void setPipeline(Pipeline pipeline) { + private void setPipeline(Pipeline pipeline) throws IOException { if (pipeline == null) { return; } + Set<Integer> replicaIndexes = + pipeline.getNodes().stream().map(pipeline::getReplicaIndex).collect(Collectors.toSet()); + if (replicaIndexes.size() > 1) { Review Comment: ```suggestion long replicaIndexes = pipeline.getNodes().stream() .mapToInt(pipeline::getReplicaIndex) .distinct().count(); if (replicaIndexes > 1) { ``` ########## hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java: ########## @@ -186,4 +233,205 @@ private static List<OmKeyLocationInfo> lookupKey(MiniOzoneCluster cluster) return locations.getLocationList(); } + private static OmKeyLocationInfo lookupKeyFirstLocation(MiniOzoneCluster cluster) + throws IOException { + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(VOLUME) + .setBucketName(BUCKET) + .setKeyName(KEY) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); + Assertions.assertNotNull(locations); + return locations.getLocationList().get(0); + } + + + public void assertState(MiniOzoneCluster cluster, Map<Integer, DatanodeDetails> expectedReplicaMap) + throws IOException { + OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster); + Map<Integer, DatanodeDetails> replicaMap = + keyLocation.getPipeline().getNodes().stream().collect(Collectors.toMap( + dn -> keyLocation.getPipeline().getReplicaIndex(dn), Functions.identity())); + Assertions.assertEquals(expectedReplicaMap, replicaMap); + } + + private OzoneInputStream createInputStream(OzoneClient client) throws IOException { + ObjectStore objectStore = client.getObjectStore(); + OzoneVolume volume = objectStore.getVolume(VOLUME); + OzoneBucket bucket = volume.getBucket(BUCKET); + return bucket.readKey(KEY); + } + + private void mockContainerPlacementPolicy(final MockedStatic<ContainerPlacementPolicyFactory> mockedPlacementFactory, + final AtomicReference<DatanodeDetails> mockedDatanodeToRemove) { + mockedPlacementFactory.when(() -> ContainerPlacementPolicyFactory.getECPolicy(any(ConfigurationSource.class), + any(NodeManager.class), any(NetworkTopology.class), Mockito.anyBoolean(), + any(SCMContainerPlacementMetrics.class))).thenAnswer(i -> { + PlacementPolicy placementPolicy = (PlacementPolicy) Mockito.spy(i.callRealMethod()); + Mockito.doAnswer(args -> { + Set<ContainerReplica> containerReplica = ((Set<ContainerReplica>) args.getArgument(0)).stream() + .filter(r -> r.getDatanodeDetails().equals(mockedDatanodeToRemove.get())) + .collect(Collectors.toSet()); + return containerReplica; + }).when(placementPolicy).replicasToRemoveToFixOverreplication(Mockito.anySet(), Mockito.anyInt()); + return placementPolicy; + }); + } + + private void mockContainerProtocolCalls(final MockedStatic<ContainerProtocolCalls> mockedContainerProtocolCalls, + final Map<Integer, Integer> failedReadChunkCountMap) { + mockedContainerProtocolCalls.when(() -> ContainerProtocolCalls.readChunk(any(), any(), any(), anyList(), any())) + .thenAnswer(invocation -> { + int replicaIndex = ((ContainerProtos.DatanodeBlockID) invocation.getArgument(2)).getReplicaIndex(); + try { + return invocation.callRealMethod(); + } catch (Throwable e) { + failedReadChunkCountMap.compute(replicaIndex, + (replicaIdx, totalCount) -> totalCount == null ? 1 : (totalCount + 1)); + throw e; + } + }); + } + + + @Test + public void testECContainerReplication() throws Exception { Review Comment: `TestContainerReplication` execution time increased significantly: ``` Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 327.089 s - in org.apache.hadoop.ozone.container.TestContainerReplication ``` vs. ``` Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 188.36 s - in org.apache.hadoop.ozone.container.TestContainerReplication ``` ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java: ########## @@ -204,7 +205,7 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { // it or remove it completely if possible String id = pipeline.getFirstNode().getUuidString(); ContainerProtos.ContainerCommandRequestProto.Builder builder = - ContainerProtos.ContainerCommandRequestProto.newBuilder() + getContainerCommandRequestProtoBuilder() Review Comment: The patch is huge (122K), hard to review. Size could be reduced by prefactoring: - introduce this factory method - replace all calls of `newBuilder()` in a separate patch, without any functional changes, before the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org