This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0915f0b1b8 HDDS-10985. EC Reconstruction failed because the size of
currentChunks was not equal to checksumBlockDataChunks. (#7009)
0915f0b1b8 is described below
commit 0915f0b1b83c0d354d1844d92861711c62489df5
Author: slfan1989 <[email protected]>
AuthorDate: Wed Sep 11 17:49:10 2024 +0800
HDDS-10985. EC Reconstruction failed because the size of currentChunks was
not equal to checksumBlockDataChunks. (#7009)
---
.../hdds/scm/storage/ECBlockOutputStream.java | 34 ++++++-
.../ozone/container/common/helpers/BlockData.java | 11 +++
.../hdds/scm/storage/TestContainerCommandsEC.java | 104 ++++++++++++++++-----
3 files changed, 126 insertions(+), 23 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 12ca9978c6..7776e245be 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -38,9 +38,13 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -142,8 +146,34 @@ public class ECBlockOutputStream extends BlockOutputStream
{
}
if (checksumBlockData != null) {
- List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
+
+ // For the same BlockGroupLength, we need to find the larger value of
Block DataSize.
+ // This is because we do not send empty chunks to the DataNode, so the
larger value is more accurate.
+ Map<Long, Optional<BlockData>> maxDataSizeByGroup =
Arrays.stream(blockData)
+ .filter(Objects::nonNull)
+ .collect(Collectors.groupingBy(BlockData::getBlockGroupLength,
+ Collectors.maxBy(Comparator.comparingLong(BlockData::getSize))));
+ BlockData maxBlockData = maxDataSizeByGroup.get(blockGroupLength).get();
+
+ // When calculating the checksum size,
+ // We need to consider both blockGroupLength and the actual size of
blockData.
+ //
+ // We use the smaller value to determine the size of the ChunkList.
+ //
+ // 1. In most cases, blockGroupLength is equal to the size of blockData.
+ // 2. Occasionally, blockData is not fully filled; if a chunk is empty,
+ // it is not sent to the DN, resulting in blockData size being smaller
than blockGroupLength.
+ // 3. In cases with 'dirty data',
+ // if an error occurs when writing to the EC-Stripe (e.g., DN reports
Container Closed),
+ // and the length confirmed with OM is smaller, blockGroupLength may be
smaller than blockData size.
+ long blockDataSize = Math.min(maxBlockData.getSize(), blockGroupLength);
+ int chunkSize = (int) Math.ceil(((double) blockDataSize /
repConfig.getEcChunkSize()));
List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
+ if (chunkSize > 0) {
+ checksumBlockDataChunks = checksumBlockData.getChunks().subList(0,
chunkSize);
+ }
+
+ List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
Preconditions.checkArgument(
currentChunks.size() == checksumBlockDataChunks.size(),
@@ -269,7 +299,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
throw ce;
});
} catch (IOException | ExecutionException e) {
- throw new IOException(EXCEPTION_MSG + e.toString(), e);
+ throw new IOException(EXCEPTION_MSG + e, e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
index 4bd170df8e..ea5c5453f3 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto3Codec;
+import org.apache.hadoop.ozone.OzoneConsts;
import java.io.IOException;
import java.util.Collections;
@@ -280,4 +281,14 @@ public class BlockData {
sb.append(", size=").append(size);
sb.append("]");
}
+
+ public long getBlockGroupLength() {
+ String lenStr = getMetadata()
+ .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
+ // If we don't have the length, then it indicates a problem with the
stripe.
+ // All replica should carry the length, so if it is not there, we return 0,
+ // which will cause us to set the length of the block to zero and not
+ // attempt to reconstruct it.
+ return (lenStr == null) ? 0 : Long.parseLong(lenStr);
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index c274d8fea3..6f79839cd0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.SecretKeyTestClient;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -83,6 +84,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
@@ -99,6 +101,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -117,6 +120,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* This class tests container commands on EC containers.
@@ -613,30 +617,33 @@ public class TestContainerCommandsEC {
@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
- void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
+ void testECReconstructionCoordinatorWith(List<Integer> missingIndexes,
boolean triggerRetry)
throws Exception {
- testECReconstructionCoordinator(missingIndexes, 3);
+ testECReconstructionCoordinator(missingIndexes, 3, triggerRetry);
}
@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
- void testECReconstructionCoordinatorWithPartialStripe(List<Integer>
missingIndexes)
- throws Exception {
- testECReconstructionCoordinator(missingIndexes, 1);
+ void testECReconstructionCoordinatorWithPartialStripe(List<Integer>
missingIndexes,
+ boolean triggerRetry) throws Exception {
+ testECReconstructionCoordinator(missingIndexes, 1, triggerRetry);
}
@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
- void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer>
missingIndexes)
- throws Exception {
- testECReconstructionCoordinator(missingIndexes, 4);
+ void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer>
missingIndexes,
+ boolean triggerRetry) throws Exception {
+ testECReconstructionCoordinator(missingIndexes, 4, triggerRetry);
}
- static Stream<List<Integer>> recoverableMissingIndexes() {
- return Stream
- .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
- .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
- ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
+ static Stream<Arguments> recoverableMissingIndexes() {
+ Stream<Arguments> args = IntStream.rangeClosed(1, 5).mapToObj(i ->
arguments(ImmutableList.of(i), true));
+ Stream<Arguments> args1 = IntStream.rangeClosed(1, 5).mapToObj(i ->
arguments(ImmutableList.of(i), false));
+ Stream<Arguments> args2 = Stream.of(arguments(ImmutableList.of(2, 3),
true),
+ arguments(ImmutableList.of(2, 4), true), arguments(ImmutableList.of(3,
5), true));
+ Stream<Arguments> args3 = Stream.of(arguments(ImmutableList.of(2, 3),
false),
+ arguments(ImmutableList.of(2, 4), false),
arguments(ImmutableList.of(3, 5), false));
+ return Stream.concat(Stream.concat(args, args1), Stream.concat(args2,
args3));
}
/**
@@ -647,7 +654,7 @@ public class TestContainerCommandsEC {
public void testECReconstructionCoordinatorWithMissingIndexes135() {
InsufficientLocationsException exception =
assertThrows(InsufficientLocationsException.class, () -> {
- testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3);
+ testECReconstructionCoordinator(ImmutableList.of(1, 3, 5), 3, false);
});
String expectedMessage =
@@ -658,7 +665,7 @@ public class TestContainerCommandsEC {
}
private void testECReconstructionCoordinator(List<Integer> missingIndexes,
- int numInputChunks) throws Exception {
+ int numInputChunks, boolean triggerRetry) throws Exception {
ObjectStore objectStore = rpcClient.getObjectStore();
String keyString = UUID.randomUUID().toString();
String volumeName = UUID.randomUUID().toString();
@@ -667,7 +674,7 @@ public class TestContainerCommandsEC {
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
- createKeyAndWriteData(keyString, bucket, numInputChunks);
+ createKeyAndWriteData(keyString, bucket, numInputChunks, triggerRetry);
try (
XceiverClientManager xceiverClientManager =
@@ -779,7 +786,7 @@ public class TestContainerCommandsEC {
.getReplicationConfig(), cToken);
assertEquals(blockDataArrList.get(i).length,
reconstructedBlockData.length);
- checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+ checkBlockDataWithRetry(blockDataArrList.get(i),
reconstructedBlockData, triggerRetry);
XceiverClientSpi client = xceiverClientManager.acquireClient(
newTargetPipeline);
try {
@@ -800,7 +807,7 @@ public class TestContainerCommandsEC {
}
private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
- int numChunks) throws IOException {
+ int numChunks, boolean triggerRetry) throws IOException {
for (int i = 0; i < numChunks; i++) {
inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
}
@@ -809,11 +816,48 @@ public class TestContainerCommandsEC {
new HashMap<>())) {
assertInstanceOf(KeyOutputStream.class, out.getOutputStream());
for (int i = 0; i < numChunks; i++) {
+ // We generally wait until the data is written to the last chunk
+ // before attempting to trigger CloseContainer.
+ // We use an asynchronous approach for this trigger,
+ // aiming to ensure that closing the container does not interfere with
the write operation.
+ // However, this process often needs to be executed multiple times
before it takes effect.
+ if (i == numChunks - 1 && triggerRetry) {
+ triggerRetryByCloseContainer(out);
+ }
out.write(inputChunks[i]);
}
}
}
+ private void triggerRetryByCloseContainer(OzoneOutputStream out) {
+ CompletableFuture.runAsync(() -> {
+ BlockOutputStreamEntry blockOutputStreamEntry =
out.getKeyOutputStream().getStreamEntries().get(0);
+ BlockID entryBlockID = blockOutputStreamEntry.getBlockID();
+ long entryContainerID = entryBlockID.getContainerID();
+ Pipeline entryPipeline = blockOutputStreamEntry.getPipeline();
+ Map<DatanodeDetails, Integer> replicaIndexes =
entryPipeline.getReplicaIndexes();
+ try {
+ for (Map.Entry<DatanodeDetails, Integer> entry :
replicaIndexes.entrySet()) {
+ DatanodeDetails key = entry.getKey();
+ Integer value = entry.getValue();
+ XceiverClientManager xceiverClientManager = new
XceiverClientManager(config);
+ Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
+ .generateToken(ANY_USER, ContainerID.valueOf(entryContainerID));
+ XceiverClientSpi client = xceiverClientManager.acquireClient(
+ createSingleNodePipeline(entryPipeline, key, value));
+ try {
+ ContainerProtocolCalls.closeContainer(client, entryContainerID,
cToken.encodeToUrlString());
+ } finally {
+ xceiverClientManager.releaseClient(client, false);
+ }
+ break;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
@Test
public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
throws Exception {
@@ -826,7 +870,7 @@ public class TestContainerCommandsEC {
objectStore.getVolume(volumeName).createBucket(bucketName);
OzoneVolume volume = objectStore.getVolume(volumeName);
OzoneBucket bucket = volume.getBucket(bucketName);
- createKeyAndWriteData(keyString, bucket, 3);
+ createKeyAndWriteData(keyString, bucket, 3, false);
OzoneKeyDetails key = bucket.getKey(keyString);
long conID = key.getOzoneKeyLocations().get(0).getContainerID();
@@ -900,6 +944,25 @@ public class TestContainerCommandsEC {
HddsProtos.LifeCycleEvent.CLOSE);
}
+ private void checkBlockDataWithRetry(
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
+ org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+ reconstructedBlockData, boolean triggerRetry) {
+ if (triggerRetry) {
+ for (int i = 0; i < reconstructedBlockData.length; i++) {
+ assertEquals(blockData[i].getBlockID(),
reconstructedBlockData[i].getBlockID());
+ List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
blockData[i].getChunks();
+ List<ContainerProtos.ChunkInfo> newBlockDataChunks =
reconstructedBlockData[i].getChunks();
+ for (int j = 0; j < newBlockDataChunks.size(); j++) {
+ ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
+ assertEquals(chunkInfo, newBlockDataChunks.get(j));
+ }
+ }
+ return;
+ }
+ checkBlockData(blockData, reconstructedBlockData);
+ }
+
private void checkBlockData(
org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
org.apache.hadoop.ozone.container.common.helpers.BlockData[]
@@ -967,8 +1030,7 @@ public class TestContainerCommandsEC {
out.write(values[i]);
}
}
-// List<ContainerID> containerIDs =
-// new ArrayList<>(scm.getContainerManager().getContainerIDs());
+
List<ContainerID> containerIDs =
scm.getContainerManager().getContainers()
.stream()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]