This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new dca2bf9  HDFS-15759. EC: Verify EC reconstruction correctness on 
DataNode (#2585)
dca2bf9 is described below

commit dca2bf9dd57197740a9e4523864dec6014b5bf9e
Author: touchida <56789230+touch...@users.noreply.github.com>
AuthorDate: Wed Mar 24 17:56:09 2021 +0900

    HDFS-15759. EC: Verify EC reconstruction correctness on DataNode (#2585)
    
    (cherry picked from commit 95e68926750b55196cf9da53c25359c98ef58a4f)
---
 .../io/erasurecode/rawcoder/DecodingValidator.java | 187 ++++++++++++++++
 .../rawcoder/InvalidDecodingException.java         |  35 +++
 .../hadoop-common/src/site/markdown/Metrics.md     |   1 +
 .../hadoop/io/erasurecode/TestCoderBase.java       |  12 ++
 .../rawcoder/TestDecodingValidator.java            | 237 +++++++++++++++++++++
 .../io/erasurecode/rawcoder/TestRawCoderBase.java  |   2 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   3 +
 .../server/datanode/DataNodeFaultInjector.java     |   7 +
 .../StripedBlockChecksumReconstructor.java         |  12 +-
 .../erasurecode/StripedBlockReconstructor.java     |  26 ++-
 .../datanode/erasurecode/StripedReconstructor.java |  46 +++-
 .../server/datanode/metrics/DataNodeMetrics.java   |  10 +
 .../src/main/resources/hdfs-default.xml            |  10 +
 .../hadoop/hdfs/TestReconstructStripedFile.java    |  24 ++-
 .../TestReconstructStripedFileWithValidator.java   | 115 ++++++++++
 15 files changed, 720 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java
new file mode 100644
index 0000000..9597058
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.erasurecode.ECChunk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A utility class to validate decoding.
+ */
+@InterfaceAudience.Private
+public class DecodingValidator {
+
+  private final RawErasureDecoder decoder;
+  private ByteBuffer buffer;
+  private int[] newValidIndexes;
+  private int newErasedIndex;
+
+  public DecodingValidator(RawErasureDecoder decoder) {
+    this.decoder = decoder;
+  }
+
+  /**
+   * Validate outputs decoded from inputs, by decoding an input back from
+   * the outputs and comparing it with the original one.
+   *
+   * For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources
+   * and (p0, p1, p2) be parities, and assume
+   *  inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)];
+   *  erasedIndexes = [1, 6];
+   *  outputs = [d1, p0].
+   * Then
+   *  1. Create new inputs, erasedIndexes and outputs for validation so that
+   *     the inputs could contain the decoded outputs, and decode them:
+   *      newInputs = [d1, d2, d3, d4, d5, p0]
+   *      newErasedIndexes = [0]
+   *      newOutputs = [d0']
+   *  2. Compare d0 and d0'. The comparison will fail with high probability
+   *     when the initial outputs are wrong.
+   *
+   * Note that the input buffers' positions must be the ones where data are
+   * read: If the input buffers have been processed by a decoder, the buffers'
+   * positions must be reset before being passed into this method.
+   *
+   * This method does not change outputs and erasedIndexes.
+   *
+   * @param inputs input buffers used for decoding. The buffers' position
+   *               are moved to the end after this method.
+   * @param erasedIndexes indexes of erased units used for decoding
+   * @param outputs decoded output buffers, which are ready to be read after
+   *                the call
+   * @throws IOException
+   */
+  public void validate(ByteBuffer[] inputs, int[] erasedIndexes,
+      ByteBuffer[] outputs) throws IOException {
+    markBuffers(outputs);
+
+    try {
+      ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
+      boolean isDirect = validInput.isDirect();
+      int capacity = validInput.capacity();
+      int remaining = validInput.remaining();
+
+      // Init buffer
+      if (buffer == null || buffer.isDirect() != isDirect
+          || buffer.capacity() < remaining) {
+        buffer = allocateBuffer(isDirect, capacity);
+      }
+      buffer.clear().limit(remaining);
+
+      // Create newInputs and newErasedIndex for validation
+      ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
+      int count = 0;
+      for (int i = 0; i < erasedIndexes.length; i++) {
+        newInputs[erasedIndexes[i]] = outputs[i];
+        count++;
+      }
+      newErasedIndex = -1;
+      boolean selected = false;
+      int numValidIndexes = CoderUtil.getValidIndexes(inputs).length;
+      for (int i = 0; i < newInputs.length; i++) {
+        if (count == numValidIndexes) {
+          break;
+        } else if (!selected && inputs[i] != null) {
+          newErasedIndex = i;
+          newInputs[i] = null;
+          selected = true;
+        } else if (newInputs[i] == null) {
+          newInputs[i] = inputs[i];
+          if (inputs[i] != null) {
+            count++;
+          }
+        }
+      }
+
+      // Keep it for testing
+      newValidIndexes = CoderUtil.getValidIndexes(newInputs);
+
+      decoder.decode(newInputs, new int[]{newErasedIndex},
+          new ByteBuffer[]{buffer});
+
+      if (!buffer.equals(inputs[newErasedIndex])) {
+        throw new InvalidDecodingException("Failed to validate decoding");
+      }
+    } finally {
+      toLimits(inputs);
+      resetBuffers(outputs);
+    }
+  }
+
+  /**
+   *  Validate outputs decoded from inputs, by decoding an input back from
+   *  those outputs and comparing it with the original one.
+   * @param inputs input buffers used for decoding
+   * @param erasedIndexes indexes of erased units used for decoding
+   * @param outputs decoded output buffers
+   * @throws IOException
+   */
+  public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] 
outputs)
+      throws IOException {
+    ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
+    ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
+    validate(newInputs, erasedIndexes, newOutputs);
+  }
+
+  private ByteBuffer allocateBuffer(boolean direct, int capacity) {
+    if (direct) {
+      buffer = ByteBuffer.allocateDirect(capacity);
+    } else {
+      buffer = ByteBuffer.allocate(capacity);
+    }
+    return buffer;
+  }
+
+  private static void markBuffers(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer: buffers) {
+      if (buffer != null) {
+        buffer.mark();
+      }
+    }
+  }
+
+  private static void resetBuffers(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer: buffers) {
+      if (buffer != null) {
+        buffer.reset();
+      }
+    }
+  }
+
+  private static void toLimits(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer: buffers) {
+      if (buffer != null) {
+        buffer.position(buffer.limit());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  protected int[] getNewValidIndexes() {
+    return newValidIndexes;
+  }
+
+  @VisibleForTesting
+  protected int getNewErasedIndex() {
+    return newErasedIndex;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java
new file mode 100644
index 0000000..37869f8
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * Thrown for invalid decoding.
+ */
+@InterfaceAudience.Private
+public class InvalidDecodingException
+    extends IOException {
+  private static final long serialVersionUID = 0L;
+
+  public InvalidDecodingException(String description) {
+    super(description);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md 
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 4b2abfb..5260db1 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -450,6 +450,7 @@ Each metrics record contains tags such as SessionId and 
Hostname as additional i
 | `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending 
incremental block report (IBR) |
 | `EcReconstructionTasks` | Total number of erasure coding reconstruction 
tasks |
 | `EcFailedReconstructionTasks` | Total number of erasure coding failed 
reconstruction tasks |
+| `EcInvalidReconstructionTasks` | Total number of erasure coding invalidated 
reconstruction tasks |
 | `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks |
 | `EcReconstructionBytesRead` | Total number of bytes read by erasure coding 
worker |
 | `EcReconstructionBytesWritten` | Total number of bytes written by erasure 
coding worker |
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
index 6d14de8..331cecb 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
@@ -527,4 +527,16 @@ public abstract class TestCoderBase {
       buffer.position(buffer.position() + 1);
     }
   }
+
+  /**
+   * Pollute some chunk.
+   * @param chunks
+   */
+  protected void polluteSomeChunk(ECChunk[] chunks) {
+    int idx = new Random().nextInt(chunks.length);
+    ByteBuffer buffer = chunks[idx].getBuffer();
+    buffer.mark();
+    buffer.put((byte) ((buffer.get(buffer.position()) + 1)));
+    buffer.reset();
+  }
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java
new file mode 100644
index 0000000..06744cc
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link DecodingValidator} under various decoders.
+ */
+@RunWith(Parameterized.class)
+public class TestDecodingValidator extends TestRawCoderBase {
+
+  private DecodingValidator validator;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {RSRawErasureCoderFactory.class, 6, 3, new int[]{1}, new int[]{}},
+        {RSRawErasureCoderFactory.class, 6, 3, new int[]{3}, new int[]{0}},
+        {RSRawErasureCoderFactory.class, 6, 3, new int[]{2, 4}, new int[]{1}},
+        {NativeRSRawErasureCoderFactory.class, 6, 3, new int[]{0}, new 
int[]{}},
+        {XORRawErasureCoderFactory.class, 10, 1, new int[]{0}, new int[]{}},
+        {NativeXORRawErasureCoderFactory.class, 10, 1, new int[]{0},
+            new int[]{}}
+    });
+  }
+
+  public TestDecodingValidator(
+      Class<? extends RawErasureCoderFactory> factoryClass, int numDataUnits,
+      int numParityUnits, int[] erasedDataIndexes, int[] erasedParityIndexes) {
+    this.encoderFactoryClass = factoryClass;
+    this.decoderFactoryClass = factoryClass;
+    this.numDataUnits = numDataUnits;
+    this.numParityUnits = numParityUnits;
+    this.erasedDataIndexes = erasedDataIndexes;
+    this.erasedParityIndexes = erasedParityIndexes;
+  }
+
+  @Before
+  public void setup() {
+    if (encoderFactoryClass == NativeRSRawErasureCoderFactory.class
+        || encoderFactoryClass == NativeXORRawErasureCoderFactory.class) {
+      Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded());
+    }
+    setAllowDump(false);
+  }
+
+  /**
+   * Test if the same validator can process direct and non-direct buffers.
+   */
+  @Test
+  public void testValidate() {
+    prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
+        erasedParityIndexes);
+    testValidate(true);
+    testValidate(false);
+  }
+
+  /**
+   * Test if the same validator can process variable width of data for
+   * inputs and outputs.
+   */
+  protected void testValidate(boolean usingDirectBuffer) {
+    this.usingDirectBuffer = usingDirectBuffer;
+    prepareCoders(false);
+    prepareValidator(false);
+
+    performTestValidate(baseChunkSize);
+    performTestValidate(baseChunkSize - 17);
+    performTestValidate(baseChunkSize + 18);
+  }
+
+  protected void prepareValidator(boolean recreate) {
+    if (validator == null || recreate) {
+      validator = new DecodingValidator(decoder);
+    }
+  }
+
+  protected void performTestValidate(int chunkSize) {
+    setChunkSize(chunkSize);
+    prepareBufferAllocator(false);
+
+    // encode
+    ECChunk[] dataChunks = prepareDataChunksForEncoding();
+    ECChunk[] parityChunks = prepareParityChunksForEncoding();
+    ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+    try {
+      encoder.encode(dataChunks, parityChunks);
+    } catch (Exception e) {
+      Assert.fail("Should not get Exception: " + e.getMessage());
+    }
+
+    // decode
+    backupAndEraseChunks(clonedDataChunks, parityChunks);
+    ECChunk[] inputChunks =
+        prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
+    markChunks(inputChunks);
+    ensureOnlyLeastRequiredChunks(inputChunks);
+    ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+    int[] erasedIndexes = getErasedIndexesForDecoding();
+    try {
+      decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
+    } catch (Exception e) {
+      Assert.fail("Should not get Exception: " + e.getMessage());
+    }
+
+    // validate
+    restoreChunksFromMark(inputChunks);
+    ECChunk[] clonedInputChunks = cloneChunksWithData(inputChunks);
+    ECChunk[] clonedRecoveredChunks = cloneChunksWithData(recoveredChunks);
+    int[] clonedErasedIndexes = erasedIndexes.clone();
+
+    try {
+      validator.validate(clonedInputChunks, clonedErasedIndexes,
+          clonedRecoveredChunks);
+    } catch (Exception e) {
+      Assert.fail("Should not get Exception: " + e.getMessage());
+    }
+
+    // Check if input buffers' positions are moved to the end
+    verifyBufferPositionAtEnd(clonedInputChunks);
+
+    // Check if validator does not change recovered chunks and erased indexes
+    verifyChunksEqual(recoveredChunks, clonedRecoveredChunks);
+    Assert.assertArrayEquals("Erased indexes should not be changed",
+        erasedIndexes, clonedErasedIndexes);
+
+    // Check if validator uses correct indexes for validation
+    List<Integer> validIndexesList =
+        IntStream.of(CoderUtil.getValidIndexes(inputChunks)).boxed()
+            .collect(Collectors.toList());
+    List<Integer> newValidIndexesList =
+        IntStream.of(validator.getNewValidIndexes()).boxed()
+            .collect(Collectors.toList());
+    List<Integer> erasedIndexesList =
+        IntStream.of(erasedIndexes).boxed().collect(Collectors.toList());
+    int newErasedIndex = validator.getNewErasedIndex();
+    Assert.assertTrue(
+        "Valid indexes for validation should contain"
+        + " erased indexes for decoding",
+        newValidIndexesList.containsAll(erasedIndexesList));
+    Assert.assertTrue(
+        "An erased index for validation should be contained"
+        + " in valid indexes for decoding",
+        validIndexesList.contains(newErasedIndex));
+    Assert.assertFalse(
+        "An erased index for validation should not be contained"
+        + " in valid indexes for validation",
+        newValidIndexesList.contains(newErasedIndex));
+  }
+
+  private void verifyChunksEqual(ECChunk[] chunks1, ECChunk[] chunks2) {
+    boolean result = Arrays.deepEquals(toArrays(chunks1), toArrays(chunks2));
+    assertTrue("Recovered chunks should not be changed", result);
+  }
+
+  /**
+   * Test if validator throws {@link InvalidDecodingException} when
+   * a decoded output buffer is polluted.
+   */
+  @Test
+  public void testValidateWithBadDecoding() throws IOException {
+    prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
+        erasedParityIndexes);
+    this.usingDirectBuffer = true;
+    prepareCoders(true);
+    prepareValidator(true);
+    prepareBufferAllocator(false);
+
+    // encode
+    ECChunk[] dataChunks = prepareDataChunksForEncoding();
+    ECChunk[] parityChunks = prepareParityChunksForEncoding();
+    ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+    try {
+      encoder.encode(dataChunks, parityChunks);
+    } catch (Exception e) {
+      Assert.fail("Should not get Exception: " + e.getMessage());
+    }
+
+    // decode
+    backupAndEraseChunks(clonedDataChunks, parityChunks);
+    ECChunk[] inputChunks =
+        prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
+    markChunks(inputChunks);
+    ensureOnlyLeastRequiredChunks(inputChunks);
+    ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+    int[] erasedIndexes = getErasedIndexesForDecoding();
+    try {
+      decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
+    } catch (Exception e) {
+      Assert.fail("Should not get Exception: " + e.getMessage());
+    }
+
+    // validate
+    restoreChunksFromMark(inputChunks);
+    polluteSomeChunk(recoveredChunks);
+    try {
+      validator.validate(inputChunks, erasedIndexes, recoveredChunks);
+      Assert.fail("Validation should fail due to bad decoding");
+    } catch (InvalidDecodingException e) {
+      String expected = "Failed to validate decoding";
+      GenericTestUtils.assertExceptionContains(expected, e);
+    }
+  }
+}
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
index 4519e35..eb63494 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
@@ -334,7 +334,7 @@ public abstract class TestRawCoderBase extends 
TestCoderBase {
     verifyBufferPositionAtEnd(inputChunks);
   }
 
-  private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
+  void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
     for (ECChunk chunk : inputChunks) {
       if (chunk != null) {
         Assert.assertEquals(0, chunk.getBuffer().remaining());
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 965ea8d..c516d47 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -819,6 +819,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.datanode.ec.reconstruction.xmits.weight";
   public static final float   DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
       0.5f;
+  public static final String DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY =
+      "dfs.datanode.ec.reconstruction.validation";
+  public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = 
false;
 
   public static final String
       DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index e4f732c..8ff3e30 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * Used for injecting faults in DFSClient and DFSOutputStream tests.
@@ -132,4 +133,10 @@ public class DataNodeFaultInjector {
    * Just delay a while.
    */
   public void delay() {}
+
+  /**
+   * Used as a hook to inject data pollution
+   * into an erasure coding reconstruction.
+   */
+  public void badDecoding(ByteBuffer[] outputs) {}
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index e28d6c5..a196935 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -57,6 +57,7 @@ public abstract class StripedBlockChecksumReconstructor
 
   private void init() throws IOException {
     initDecoderIfNecessary();
+    initDecodingValidatorIfNecessary();
     getStripedReader().init();
     // allocate buffer to keep the reconstructed block data
     targetBuffer = allocateBuffer(getBufferSize());
@@ -192,7 +193,16 @@ public abstract class StripedBlockChecksumReconstructor
     for (int i = 0; i < targetIndices.length; i++) {
       tarIndices[i] = targetIndices[i];
     }
-    getDecoder().decode(inputs, tarIndices, outputs);
+
+    if (isValidationEnabled()) {
+      markBuffers(inputs);
+      getDecoder().decode(inputs, tarIndices, outputs);
+      resetBuffers(inputs);
+
+      getValidator().validate(inputs, tarIndices, outputs);
+    } else {
+      getDecoder().decode(inputs, tarIndices, outputs);
+    }
   }
 
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index 1af2380..cd59f51 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.io.erasurecode.rawcoder.InvalidDecodingException;
 import org.apache.hadoop.util.Time;
 
 /**
@@ -53,6 +54,8 @@ class StripedBlockReconstructor extends StripedReconstructor
     try {
       initDecoderIfNecessary();
 
+      initDecodingValidatorIfNecessary();
+
       getStripedReader().init();
 
       stripedWriter.init();
@@ -126,12 +129,31 @@ class StripedBlockReconstructor extends 
StripedReconstructor
     int[] erasedIndices = stripedWriter.getRealTargetIndices();
     ByteBuffer[] outputs = 
stripedWriter.getRealTargetBuffers(toReconstructLen);
 
+    if (isValidationEnabled()) {
+      markBuffers(inputs);
+      decode(inputs, erasedIndices, outputs);
+      resetBuffers(inputs);
+
+      DataNodeFaultInjector.get().badDecoding(outputs);
+      try {
+        getValidator().validate(inputs, erasedIndices, outputs);
+      } catch (InvalidDecodingException e) {
+        getDatanode().getMetrics().incrECInvalidReconstructionTasks();
+        throw e;
+      }
+    } else {
+      decode(inputs, erasedIndices, outputs);
+    }
+
+    stripedWriter.updateRealTargetBuffers(toReconstructLen);
+  }
+
+  private void decode(ByteBuffer[] inputs, int[] erasedIndices,
+      ByteBuffer[] outputs) throws IOException {
     long start = System.nanoTime();
     getDecoder().decode(inputs, erasedIndices, outputs);
     long end = System.nanoTime();
     this.getDatanode().getMetrics().incrECDecodingTime(end - start);
-
-    stripedWriter.updateRealTargetBuffers(toReconstructLen);
   }
 
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 851f695..06346c6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator;
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -103,10 +105,14 @@ abstract class StripedReconstructor {
   private final Configuration conf;
   private final DataNode datanode;
   private final ErasureCodingPolicy ecPolicy;
+  private final ErasureCoderOptions coderOptions;
   private RawErasureDecoder decoder;
   private final ExtendedBlock blockGroup;
   private static final ByteBufferPool BUFFER_POOL = new 
ElasticByteBufferPool();
 
+  private final boolean isValidationEnabled;
+  private DecodingValidator validator;
+
   // position in striped internal block
   private long positionInBlock;
   private StripedReader stripedReader;
@@ -136,6 +142,13 @@ abstract class StripedReconstructor {
     cachingStrategy = CachingStrategy.newDefaultStrategy();
 
     positionInBlock = 0L;
+
+    coderOptions = new ErasureCoderOptions(
+        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
+    isValidationEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE)
+        && !coderOptions.allowChangeInputs();
   }
 
   public void incrBytesRead(boolean local, long delta) {
@@ -196,13 +209,18 @@ abstract class StripedReconstructor {
   // Initialize decoder
   protected void initDecoderIfNecessary() {
     if (decoder == null) {
-      ErasureCoderOptions coderOptions = new ErasureCoderOptions(
-          ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
       decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
           coderOptions);
     }
   }
 
+  // Initialize decoding validator
+  protected void initDecodingValidatorIfNecessary() {
+    if (isValidationEnabled && validator == null) {
+      validator = new DecodingValidator(decoder);
+    }
+  }
+
   long getPositionInBlock() {
     return positionInBlock;
   }
@@ -285,4 +303,28 @@ abstract class StripedReconstructor {
   static ByteBufferPool getBufferPool() {
     return BUFFER_POOL;
   }
+
+  boolean isValidationEnabled() {
+    return isValidationEnabled;
+  }
+
+  DecodingValidator getValidator() {
+    return validator;
+  }
+
+  protected static void markBuffers(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer: buffers) {
+      if (buffer != null) {
+        buffer.mark();
+      }
+    }
+  }
+
+  protected static void resetBuffers(ByteBuffer[] buffers) {
+    for (ByteBuffer buffer: buffers) {
+      if (buffer != null) {
+        buffer.reset();
+      }
+    }
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 6e63314..bd21a5a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -152,6 +152,8 @@ public class DataNodeMetrics {
   MutableCounterLong ecReconstructionTasks;
   @Metric("Count of erasure coding failed reconstruction tasks")
   MutableCounterLong ecFailedReconstructionTasks;
+  @Metric("Count of erasure coding invalidated reconstruction tasks")
+  private MutableCounterLong ecInvalidReconstructionTasks;
   @Metric("Nanoseconds spent by decoding tasks")
   MutableCounterLong ecDecodingTimeNanos;
   @Metric("Bytes read by erasure coding worker")
@@ -519,6 +521,14 @@ public class DataNodeMetrics {
     ecFailedReconstructionTasks.incr();
   }
 
+  public void incrECInvalidReconstructionTasks() {
+    ecInvalidReconstructionTasks.incr();
+  }
+
+  public long getECInvalidReconstructionTasks() {
+    return ecInvalidReconstructionTasks.value();
+  }
+
   public void incrDataNodeActiveXceiversCount() {
     dataNodeActiveXceiversCount.incr();
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ce9195c..15828d9 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3621,6 +3621,16 @@
 </property>
 
 <property>
+  <name>dfs.datanode.ec.reconstruction.validation</name>
+  <value>false</value>
+  <description>
+    Decide if datanode validates that EC reconstruction tasks reconstruct
+    target blocks correctly. When validation fails, reconstruction tasks
+    will fail and be retried by namenode.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.quota.init-threads</name>
   <value>4</value>
   <description>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 5e29868..7dd9a20 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -107,6 +107,23 @@ public class TestReconstructStripedFile {
     return StripedFileTestUtil.getDefaultECPolicy();
   }
 
+  public boolean isValidationEnabled() {
+    return false;
+  }
+
+  public int getPendingTimeout() {
+    return DFSConfigKeys
+        .DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
+  }
+
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
   @Before
   public void setup() throws IOException {
     ecPolicy = getEcPolicy();
@@ -130,6 +147,11 @@ public class TestReconstructStripedFile {
           CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
           NativeRSRawErasureCoderFactory.CODER_NAME);
     }
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        getPendingTimeout());
+    conf.setBoolean(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
+        isValidationEnabled());
     File basedir = new File(GenericTestUtils.getRandomizedTempPath());
     cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum)
         .build();
@@ -305,7 +327,7 @@ public class TestReconstructStripedFile {
    *    and verify the block replica length, generationStamp and content.
    * 2. Read the file and verify content.
    */
-  private void assertFileBlocksReconstruction(String fileName, int fileLen,
+  void assertFileBlocksReconstruction(String fileName, int fileLen,
       ReconstructionType type, int toRecoverBlockNum) throws Exception {
     if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
       Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java
new file mode 100644
index 0000000..00749ef
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This test extends {@link TestReconstructStripedFile} to test
+ * ec reconstruction validation.
+ */
+public class TestReconstructStripedFileWithValidator
+    extends TestReconstructStripedFile {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestReconstructStripedFileWithValidator.class);
+
+  public TestReconstructStripedFileWithValidator() {
+    LOG.info("run {} with validator.",
+        TestReconstructStripedFileWithValidator.class.getSuperclass()
+            .getSimpleName());
+  }
+
+  /**
+   * This test injects data pollution into decoded outputs once.
+   * When validation enabled, the first reconstruction task should fail
+   * in the validation, but the data will be recovered correctly
+   * by the next task.
+   * On the other hand, when validation disabled, the first reconstruction task
+   * will succeed and then lead to data corruption.
+   */
+  @Test(timeout = 120000)
+  public void testValidatorWithBadDecoding()
+      throws Exception {
+    MiniDFSCluster cluster = getCluster();
+
+    cluster.getDataNodes().stream()
+        .map(DataNode::getMetrics)
+        .map(DataNodeMetrics::getECInvalidReconstructionTasks)
+        .forEach(n -> Assert.assertEquals(0, (long) n));
+
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector badDecodingInjector = new DataNodeFaultInjector() {
+      private final AtomicBoolean flag = new AtomicBoolean(false);
+
+      @Override
+      public void badDecoding(ByteBuffer[] outputs) {
+        if (!flag.get()) {
+          for (ByteBuffer output : outputs) {
+            output.mark();
+            output.put((byte) (output.get(output.position()) + 1));
+            output.reset();
+          }
+        }
+        flag.set(true);
+      }
+    };
+    DataNodeFaultInjector.set(badDecodingInjector);
+
+    int fileLen =
+        (getEcPolicy().getNumDataUnits() + getEcPolicy().getNumParityUnits())
+            * getBlockSize() + getBlockSize() / 10;
+    try {
+      assertFileBlocksReconstruction(
+          "/testValidatorWithBadDecoding",
+          fileLen,
+          ReconstructionType.DataOnly,
+          getEcPolicy().getNumParityUnits());
+
+      long sum = cluster.getDataNodes().stream()
+          .map(DataNode::getMetrics)
+          .mapToLong(DataNodeMetrics::getECInvalidReconstructionTasks)
+          .sum();
+      Assert.assertEquals(1, sum);
+    } finally {
+      DataNodeFaultInjector.set(oldInjector);
+    }
+  }
+
+  @Override
+  public boolean isValidationEnabled() {
+    return true;
+  }
+
+  /**
+   * Set a small value for the failed reconstruction task to be
+   * rescheduled in a short period of time.
+   */
+  @Override
+  public int getPendingTimeout() {
+    return 10;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to