This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 65268cc2c1 HDDS-7918. EC: ECBlockReconstructedStripeInputStream should
check for spare replicas before failing an index (#4441)
65268cc2c1 is described below
commit 65268cc2c1aa90086a7dba15e55a46c93962ec24
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Mar 21 16:00:12 2023 +0100
HDDS-7918. EC: ECBlockReconstructedStripeInputStream should check for spare
replicas before failing an index (#4441)
---
.../ozone/client/io/BadDataLocationException.java | 9 ++-
.../hadoop/ozone/client/io/ECBlockInputStream.java | 4 +-
.../io/ECBlockReconstructedStripeInputStream.java | 26 ++++++++
.../hadoop/ozone/client/io/ECStreamTestUtil.java | 9 +--
.../TestECBlockReconstructedStripeInputStream.java | 76 +++++++++++++++++++---
5 files changed, 108 insertions(+), 16 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
index 65e124eaf5..5046a44895 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
@@ -29,7 +29,7 @@ import java.util.List;
*/
public class BadDataLocationException extends IOException {
- private List<DatanodeDetails> failedLocations = new ArrayList<>();
+ private final List<DatanodeDetails> failedLocations = new ArrayList<>();
private int failedLocationIndex;
public BadDataLocationException(DatanodeDetails dn) {
@@ -53,6 +53,13 @@ public class BadDataLocationException extends IOException {
failedLocations.add(dn);
}
+ public BadDataLocationException(int failedIndex,
+ Throwable ex, List<DatanodeDetails> failedLocations) {
+ super(ex);
+ failedLocationIndex = failedIndex;
+ this.failedLocations.addAll(failedLocations);
+ }
+
public BadDataLocationException(DatanodeDetails dn, int failedIndex,
Throwable ex) {
super(ex);
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index dc354198ca..8ad8f8851e 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -334,7 +334,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
}
}
- private boolean shouldRetryFailedRead(int failedIndex) {
+ protected boolean shouldRetryFailedRead(int failedIndex) {
Deque<DatanodeDetails> spareLocations =
spareDataLocations.get(failedIndex);
if (spareLocations != null && spareLocations.size() > 0) {
failedLocations.add(dataLocations[failedIndex]);
@@ -470,7 +470,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
seeked = true;
}
- private void closeStream(int i) {
+ protected void closeStream(int i) {
if (blockStreams[i] != null) {
try {
blockStreams[i].close();
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 9658fb784d..492bf33a10 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -610,6 +611,31 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
}
private void readIntoBuffer(int ind, ByteBuffer buf) throws IOException {
+ List<DatanodeDetails> failedLocations = new LinkedList<>();
+ while (true) {
+ int currentBufferPosition = buf.position();
+ try {
+ readFromCurrentLocation(ind, buf);
+ break;
+ } catch (IOException e) {
+ DatanodeDetails failedLocation = getDataLocations()[ind];
+ failedLocations.add(failedLocation);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: read [{}] failed from {} due to {}", this,
+ ind, failedLocation, e.getMessage());
+ }
+ closeStream(ind);
+ if (shouldRetryFailedRead(ind)) {
+ buf.position(currentBufferPosition);
+ } else {
+ throw new BadDataLocationException(ind, e, failedLocations);
+ }
+ }
+ }
+ }
+
+ private void readFromCurrentLocation(int ind, ByteBuffer buf)
+ throws IOException {
BlockExtendedInputStream stream = getOrOpenStream(ind);
seekStreamIfNecessary(stream, 0);
while (buf.hasRemaining()) {
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
index 0fe5886f1b..db08bd7343 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
@@ -223,7 +223,7 @@ public final class ECStreamTestUtil {
new LinkedHashMap<>();
private List<ByteBuffer> blockStreamData;
// List of EC indexes that should fail immediately on read
- private List<Integer> failIndexes = new ArrayList<>();
+ private final List<Integer> failIndexes = new ArrayList<>();
private Pipeline currentPipeline;
@@ -249,8 +249,9 @@ public final class ECStreamTestUtil {
this.currentPipeline = pipeline;
}
- public synchronized void setFailIndexes(List<Integer> fail) {
- failIndexes.addAll(fail);
+ // fail each index in the list once
+ public synchronized void setFailIndexes(Integer... fail) {
+ failIndexes.addAll(Arrays.asList(fail));
}
public synchronized BlockExtendedInputStream create(
@@ -264,7 +265,7 @@ public final class ECStreamTestUtil {
TestBlockInputStream stream = new TestBlockInputStream(
blockInfo.getBlockID(), blockInfo.getLength(),
blockStreamData.get(repInd - 1), repInd);
- if (failIndexes.contains(repInd)) {
+ if (failIndexes.remove(Integer.valueOf(repInd))) {
stream.setShouldError(true);
}
blockStreams.put(repInd, stream);
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
index 7ad6d3e185..62d8c2d760 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
@@ -489,6 +490,71 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
+ @Test
+ void testNoErrorIfSpareLocationToRead() throws IOException {
+ int chunkSize = repConfig.getEcChunkSize();
+ int blockLength = chunkSize * 3 - 1;
+
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+ ECStreamTestUtil
+ .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen,
blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+ // We have a length that is less than a stripe, so chunks 1 and 2 are full.
+ // Block 1 is lost and needs recovered
+ // from the parity and padded blocks 2 and 3.
+
+ List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+ // Two data missing
+ locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5));
+ // Two data missing
+ locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
+ // One data missing - the last one
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5));
+ // One data and one parity missing
+ locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
+ // One data and one parity missing
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4));
+ // No indexes missing
+ locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
+
+ DatanodeDetails spare = MockDatanodeDetails.randomDatanodeDetails();
+
+ for (Map<DatanodeDetails, Integer> dnMap : locations) {
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+
+ // this index fails, but has spare replica
+ int failing = dnMap.values().iterator().next();
+ streamFactory.setFailIndexes(failing);
+ dnMap.put(spare, failing);
+
+ BlockLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+ dataGen = new SplittableRandom(randomSeed);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(blockLength, read);
+ ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+ ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
+ ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
+ // Check the underlying streams have been advanced by 1 chunk:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assertions.assertEquals(0, bis.getRemaining());
+ }
+ Assertions.assertEquals(ecb.getPos(), blockLength);
+ clearBuffers(bufs);
+ // A further read should give EOF
+ read = ecb.read(bufs);
+ Assertions.assertEquals(-1, read);
+ }
+ }
+ }
+
@Test
public void testSeek() throws IOException {
// Generate the input data for 3 full stripes and generate the parity
@@ -688,7 +754,7 @@ public class TestECBlockReconstructedStripeInputStream {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
// Fail all the indexes containing data on their first read.
- streamFactory.setFailIndexes(indexesToList(1, 4, 5));
+ streamFactory.setFailIndexes(1, 4, 5);
// The locations contain the padded indexes, as will often be the case
// when containers are reported by SCM.
Map<DatanodeDetails, Integer> dnMap =
@@ -759,14 +825,6 @@ public class TestECBlockReconstructedStripeInputStream {
null, null, streamFactory, bufferPool, ecReconstructExecutor);
}
- private List<Integer> indexesToList(int... indexes) {
- List<Integer> list = new ArrayList<>();
- for (int i : indexes) {
- list.add(i);
- }
- return list;
- }
-
private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity)
{
List<ByteBuffer> dataStreams = new ArrayList<>();
for (ByteBuffer b : data) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]