HDFS-11299. Support multiple Datanode File IO hooks. Contributed by Hanisha 
Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4046794a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4046794a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4046794a

Branch: refs/heads/YARN-5734
Commit: 4046794a5365f80f9fa002e3889e41c6d29e13a8
Parents: c18590f
Author: Arpit Agarwal <a...@apache.org>
Authored: Tue Jan 10 10:43:02 2017 -0800
Committer: Arpit Agarwal <a...@apache.org>
Committed: Tue Jan 10 10:43:11 2017 -0800

----------------------------------------------------------------------
 .../hadoop-common/src/site/markdown/Metrics.md  |   2 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  10 +-
 .../server/datanode/CountingFileIoEvents.java   | 106 -------
 .../hadoop/hdfs/server/datanode/DataNode.java   |   5 -
 .../hdfs/server/datanode/DataNodeMXBean.java    |   5 -
 .../server/datanode/DefaultFileIoEvents.java    |  67 ----
 .../datanode/FaultInjectorFileIoEvents.java     |  55 ++++
 .../hdfs/server/datanode/FileIoEvents.java      | 115 -------
 .../hdfs/server/datanode/FileIoProvider.java    | 318 +++++++++++--------
 .../server/datanode/ProfilingFileIoEvents.java  | 110 ++++---
 .../datanode/TestDataNodeVolumeMetrics.java     |   4 +-
 .../hadoop/tools/TestHdfsConfigFields.java      |   4 +-
 12 files changed, 310 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md 
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 541f5dc..9a19a9b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -326,7 +326,7 @@ Each metrics record contains tags such as SessionId and 
Hostname as additional i
 FsVolume
 --------
 
-Per-volume metrics contain Datanode Volume IO related statistics. Per-volume 
metrics are off by default. They can be enbabled by setting 
`dfs.datanode.fileio.events.class` to 
**org.apache.hadoop.hdfs.server.datanode.ProfilingFileIoEvents**, but enabling 
per-volume metrics may have a performance impact. Each metrics record contains 
tags such as Hostname as additional information along with metrics.
+Per-volume metrics contain Datanode Volume IO related statistics. Per-volume 
metrics are off by default. They can be enbabled by setting 
`dfs.datanode.enable.fileio.profiling` to **true**, but enabling per-volume 
metrics may have a performance impact. Each metrics record contains tags such 
as Hostname as additional information along with metrics.
 
 | Name | Description |
 |:---- |:---- |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 50217a2..cf9c805 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -698,8 +698,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
   public static final String  DFS_DATANODE_PLUGINS_KEY = 
"dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = 
"dfs.datanode.fsdataset.factory";
   public static final String  
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = 
"dfs.datanode.fsdataset.volume.choosing.policy";
-  public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
-      "dfs.datanode.fileio.events.class";
+  public static final String DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY =
+      "dfs.datanode.enable.fileio.profiling";
+  public static final boolean DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT =
+      false;
+  public static final String DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY =
+      "dfs.datanode.enable.fileio.fault.injection";
+  public static final boolean
+      DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT = false;
   public static final String  
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY
 = 
"dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
   public static final long    
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT
 = 1024L * 1024L * 1024L * 10L; // 10 GB
   public static final String  
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
 = 
"dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
deleted file mode 100644
index 7c6bfd6..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * {@link FileIoEvents} that simply counts the number of operations.
- * Not meant to be used outside of testing.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class CountingFileIoEvents extends FileIoEvents {
-  private final Map<OPERATION, Counts> counts;
-
-  private static class Counts {
-    private final AtomicLong successes = new AtomicLong(0);
-    private final AtomicLong failures = new AtomicLong(0);
-
-    @JsonProperty("Successes")
-    public long getSuccesses() {
-      return successes.get();
-    }
-
-    @JsonProperty("Failures")
-    public long getFailures() {
-      return failures.get();
-    }
-  }
-
-  public CountingFileIoEvents() {
-    counts = new HashMap<>();
-    for (OPERATION op : OPERATION.values()) {
-      counts.put(op, new Counts());
-    }
-  }
-
-  @Override
-  public long beforeMetadataOp(
-      @Nullable FsVolumeSpi volume, OPERATION op) {
-    return 0;
-  }
-
-  @Override
-  public void afterMetadataOp(
-      @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
-    counts.get(op).successes.incrementAndGet();
-  }
-
-  @Override
-  public long beforeFileIo(
-      @Nullable FsVolumeSpi volume, OPERATION op, long len) {
-    return 0;
-  }
-
-  @Override
-  public void afterFileIo(
-      @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
-    counts.get(op).successes.incrementAndGet();
-  }
-
-  @Override
-  public void onFailure(
-      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
-    counts.get(op).failures.incrementAndGet();
-  }
-
-  @Override
-  public String getStatistics() {
-    ObjectMapper objectMapper = new ObjectMapper();
-    try {
-      return objectMapper.writeValueAsString(counts);
-    } catch (JsonProcessingException e) {
-      // Failed to serialize. Don't log the exception call stack.
-      FileIoProvider.LOG.error("Failed to serialize statistics" + e);
-      return null;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 28d627a..090d8b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3023,11 +3023,6 @@ public class DataNode extends ReconfigurableBase
       return "";
     }
   }
-  
-  @Override // DataNodeMXBean
-  public String getFileIoProviderStatistics() {
-    return fileIoProvider.getStatistics();
-  }
 
   public void refreshNamenodes(Configuration conf) throws IOException {
     blockPoolManager.refreshNamenodes(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index ccc5f92..fb79a86 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -122,11 +122,6 @@ public interface DataNodeMXBean {
   String getDiskBalancerStatus();
 
   /**
-   * Gets the {@link FileIoProvider} statistics.
-   */
-  String getFileIoProviderStatistics();
-
-  /**
    * Gets the average info (e.g. time) of SendPacketDownstream when the 
DataNode
    * acts as the penultimate (2nd to the last) node in pipeline.
    * <p>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
deleted file mode 100644
index 6a12aae..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import javax.annotation.Nullable;
-
-/**
- * The default implementation of {@link FileIoEvents} that do nothing.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public final class DefaultFileIoEvents extends FileIoEvents {
-  @Override
-  public long beforeMetadataOp(
-      @Nullable FsVolumeSpi volume, OPERATION op) {
-    return 0;
-  }
-
-  @Override
-  public void afterMetadataOp(
-      @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
-  }
-
-  @Override
-  public long beforeFileIo(
-      @Nullable FsVolumeSpi volume, OPERATION op, long len) {
-    return 0;
-  }
-
-  @Override
-  public void afterFileIo(
-      @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
-  }
-
-  @Override
-  public void onFailure(
-      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
-  }
-
-  @Override
-  public @Nullable String getStatistics() {
-    // null is valid JSON.
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java
new file mode 100644
index 0000000..ead6ed9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * Injects faults in the metadata and data related operations on datanode
+ * volumes.
+ */
+@InterfaceAudience.Private
+public class FaultInjectorFileIoEvents {
+
+  private final boolean isEnabled;
+
+  public FaultInjectorFileIoEvents(@Nullable Configuration conf) {
+    if (conf != null) {
+      isEnabled = conf.getBoolean(DFSConfigKeys
+          .DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, DFSConfigKeys
+          .DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT);
+    } else {
+      isEnabled = false;
+    }
+  }
+
+  public void beforeMetadataOp(
+      @Nullable FsVolumeSpi volume, FileIoProvider.OPERATION op) {
+  }
+
+  public void beforeFileIo(
+      @Nullable FsVolumeSpi volume, FileIoProvider.OPERATION op, long len) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
deleted file mode 100644
index 10f2a0c..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import javax.annotation.Nullable;
-
-/**
- * The following hooks can be implemented for instrumentation/fault
- * injection.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class FileIoEvents {
-
-  /**
-   * Invoked before a filesystem metadata operation.
-   *
-   * @param volume  target volume for the operation. Null if unavailable.
-   * @param op  type of operation.
-   * @return  timestamp at which the operation was started. 0 if
-   *          unavailable.
-   */
-  abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
-
-  /**
-   * Invoked after a filesystem metadata operation has completed.
-   *
-   * @param volume  target volume for the operation.  Null if unavailable.
-   * @param op  type of operation.
-   * @param begin  timestamp at which the operation was started. 0
-   *               if unavailable.
-   */
-  abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op,
-                                long begin);
-
-  /**
-   * Invoked before a read/write/flush/channel transfer operation.
-   *
-   * @param volume  target volume for the operation. Null if unavailable.
-   * @param op  type of operation.
-   * @param len  length of the file IO. 0 for flush.
-   * @return  timestamp at which the operation was started. 0 if
-   *          unavailable.
-   */
-  abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
-                             long len);
-
-
-  /**
-   * Invoked after a read/write/flush/channel transfer operation
-   * has completed.
-   *
-   * @param volume  target volume for the operation. Null if unavailable.
-   * @param op  type of operation.
-   * @param len   of the file IO. 0 for flush.
-   * @return  timestamp at which the operation was started. 0 if
-   *          unavailable.
-   */
-  abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
-                            long begin, long len);
-
-  /**
-   * Invoked if an operation fails with an exception.
-   * @param volume  target volume for the operation. Null if unavailable.
-   * @param op  type of operation.
-   * @param e  Exception encountered during the operation.
-   * @param begin  time at which the operation was started.
-   */
-  abstract void onFailure(
-      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
-
-  /**
-   * Invoked by FileIoProvider if an operation fails with an exception.
-   * @param datanode datanode that runs volume check upon volume io failure
-   * @param volume  target volume for the operation. Null if unavailable.
-   * @param op  type of operation.
-   * @param e  Exception encountered during the operation.
-   * @param begin  time at which the operation was started.
-   */
-  void onFailure(DataNode datanode,
-      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
-    onFailure(volume, op, e, begin);
-    if (datanode != null && volume != null) {
-      datanode.checkDiskErrorAsync(volume);
-    }
-  }
-
-  /**
-   * Return statistics as a JSON string.
-   * @return
-   */
-  @Nullable abstract String getStatistics();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
index f961049..9def2e1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
-
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -34,7 +33,6 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.apache.hadoop.net.SocketOutputStream;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,11 +57,14 @@ import static 
org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;
 
 /**
  * This class abstracts out various file IO operations performed by the
- * DataNode and invokes event hooks before and after each file IO.
+ * DataNode and invokes profiling (for collecting stats) and fault injection
+ * (for testing) event hooks before and after each file IO.
  *
- * Behavior can be injected into these events by implementing
- * {@link FileIoEvents} and replacing the default implementation
- * with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}.
+ * Behavior can be injected into these events by enabling the
+ * profiling and/or fault injection event hooks through
+ * {@link DFSConfigKeys#DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY} and
+ * {@link DFSConfigKeys#DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY}.
+ * These event hooks are disabled by default.
  *
  * Most functions accept an optional {@link FsVolumeSpi} parameter for
  * instrumentation/logging.
@@ -78,9 +79,12 @@ public class FileIoProvider {
   public static final Logger LOG = LoggerFactory.getLogger(
       FileIoProvider.class);
 
-  private final FileIoEvents eventHooks;
+  private final ProfilingFileIoEvents profilingEventHook;
+  private final FaultInjectorFileIoEvents faultInjectorEventHook;
   private final DataNode datanode;
 
+  private static final int LEN_INT = 4;
+
   /**
    * @param conf  Configuration object. May be null. When null,
    *              the event handlers are no-ops.
@@ -89,15 +93,8 @@ public class FileIoProvider {
    */
   public FileIoProvider(@Nullable Configuration conf,
                         final DataNode datanode) {
-    if (conf != null) {
-      final Class<? extends FileIoEvents> clazz = conf.getClass(
-          DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
-          DefaultFileIoEvents.class,
-          FileIoEvents.class);
-      eventHooks = ReflectionUtils.newInstance(clazz, conf);
-    } else {
-      eventHooks = new DefaultFileIoEvents();
-    }
+    profilingEventHook = new ProfilingFileIoEvents(conf);
+    faultInjectorEventHook = new FaultInjectorFileIoEvents(conf);
     this.datanode = datanode;
   }
 
@@ -123,15 +120,6 @@ public class FileIoProvider {
   }
 
   /**
-   * Retrieve statistics from the underlying {@link FileIoEvents}
-   * implementation as a JSON string, if it maintains them.
-   * @return statistics as a JSON string. May be null.
-   */
-  public @Nullable String getStatistics() {
-    return eventHooks.getStatistics();
-  }
-
-  /**
    * See {@link Flushable#flush()}.
    *
    * @param  volume target volume. null if unavailable.
@@ -139,12 +127,13 @@ public class FileIoProvider {
    */
   public void flush(
       @Nullable FsVolumeSpi volume, Flushable f) throws IOException {
-    final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0);
+    final long begin = profilingEventHook.beforeFileIo(volume, FLUSH, 0);
     try {
+      faultInjectorEventHook.beforeFileIo(volume, FLUSH, 0);
       f.flush();
-      eventHooks.afterFileIo(volume, FLUSH, begin, 0);
+      profilingEventHook.afterFileIo(volume, FLUSH, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, FLUSH, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -157,12 +146,13 @@ public class FileIoProvider {
    */
   public void sync(
       @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
-    final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+    final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
     try {
+      faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
       fos.getChannel().force(true);
-      eventHooks.afterFileIo(volume, SYNC, begin, 0);
+      profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, SYNC, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -176,12 +166,13 @@ public class FileIoProvider {
   public void syncFileRange(
       @Nullable FsVolumeSpi volume, FileDescriptor outFd,
       long offset, long numBytes, int flags) throws NativeIOException {
-    final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+    final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
     try {
+      faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
       NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
-      eventHooks.afterFileIo(volume, SYNC, begin, 0);
+      profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, SYNC, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -195,13 +186,14 @@ public class FileIoProvider {
   public void posixFadvise(
       @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
       long offset, long length, int flags) throws NativeIOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, FADVISE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, FADVISE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, FADVISE);
       NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
           identifier, outFd, offset, length, flags);
-      eventHooks.afterMetadataOp(volume, FADVISE, begin);
+      profilingEventHook.afterMetadataOp(volume, FADVISE, begin);
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, FADVISE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -213,13 +205,14 @@ public class FileIoProvider {
    * @return  true if the file was successfully deleted.
    */
   public boolean delete(@Nullable FsVolumeSpi volume, File f) {
-    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
       boolean deleted = f.delete();
-      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      profilingEventHook.afterMetadataOp(volume, DELETE, begin);
       return deleted;
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -232,16 +225,17 @@ public class FileIoProvider {
    *          existed.
    */
   public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
-    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
       boolean deleted = !f.exists() || f.delete();
-      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      profilingEventHook.afterMetadataOp(volume, DELETE, begin);
       if (!deleted) {
         LOG.warn("Failed to delete file {}", f);
       }
       return deleted;
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -263,13 +257,14 @@ public class FileIoProvider {
       @Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
       FileChannel fileCh, long position, int count,
       LongWritable waitTime, LongWritable transferTime) throws IOException {
-    final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count);
+    final long begin = profilingEventHook.beforeFileIo(volume, TRANSFER, 
count);
     try {
+      faultInjectorEventHook.beforeFileIo(volume, TRANSFER, count);
       sockOut.transferToFully(fileCh, position, count,
           waitTime, transferTime);
-      eventHooks.afterFileIo(volume, TRANSFER, begin, count);
+      profilingEventHook.afterFileIo(volume, TRANSFER, begin, count);
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, TRANSFER, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -284,13 +279,14 @@ public class FileIoProvider {
    */
   public boolean createFile(
       @Nullable FsVolumeSpi volume, File f) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
       boolean created = f.createNewFile();
-      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
       return created;
     } catch (Exception e) {
-      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -309,15 +305,16 @@ public class FileIoProvider {
    */
   public FileInputStream getFileInputStream(
       @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
-    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
     FileInputStream fis = null;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
       fis = new WrappedFileInputStream(volume, f);
-      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -339,15 +336,16 @@ public class FileIoProvider {
   public FileOutputStream getFileOutputStream(
       @Nullable FsVolumeSpi volume, File f,
       boolean append) throws FileNotFoundException {
-    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
     FileOutputStream fos = null;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
       fos = new WrappedFileOutputStream(volume, f, append);
-      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
       return fos;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fos);
-      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -403,16 +401,17 @@ public class FileIoProvider {
   public FileInputStream getShareDeleteFileInputStream(
       @Nullable FsVolumeSpi volume, File f,
       long offset) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
     FileInputStream fis = null;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
       fis = new WrappedFileInputStream(volume,
           NativeIO.getShareDeleteFileDescriptor(f, offset));
-      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -434,16 +433,17 @@ public class FileIoProvider {
    */
   public FileInputStream openAndSeek(
       @Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
     FileInputStream fis = null;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
       fis = new WrappedFileInputStream(volume,
           FsDatasetUtil.openAndSeek(f, offset));
-      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -465,15 +465,16 @@ public class FileIoProvider {
   public RandomAccessFile getRandomAccessFile(
       @Nullable FsVolumeSpi volume, File f,
       String mode) throws FileNotFoundException {
-    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
     RandomAccessFile raf = null;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
       raf = new WrappedRandomAccessFile(volume, f, mode);
-      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
       return raf;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(raf);
-      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -486,13 +487,14 @@ public class FileIoProvider {
    * @return true on success false on failure.
    */
   public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
-    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
       boolean deleted = FileUtil.fullyDelete(dir);
-      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      profilingEventHook.afterMetadataOp(volume, DELETE, begin);
       return deleted;
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -508,12 +510,13 @@ public class FileIoProvider {
    */
   public void replaceFile(
       @Nullable FsVolumeSpi volume, File src, File target) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
       FileUtil.replaceFile(src, target);
-      eventHooks.afterMetadataOp(volume, MOVE, begin);
+      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -530,12 +533,13 @@ public class FileIoProvider {
   public void rename(
       @Nullable FsVolumeSpi volume, File src, File target)
       throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
       Storage.rename(src, target);
-      eventHooks.afterMetadataOp(volume, MOVE, begin);
+      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -552,12 +556,13 @@ public class FileIoProvider {
   public void moveFile(
       @Nullable FsVolumeSpi volume, File src, File target)
       throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
       FileUtils.moveFile(src, target);
-      eventHooks.afterMetadataOp(volume, MOVE, begin);
+      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -576,12 +581,13 @@ public class FileIoProvider {
   public void move(
       @Nullable FsVolumeSpi volume, Path src, Path target,
       CopyOption... options) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
       Files.move(src, target, options);
-      eventHooks.afterMetadataOp(volume, MOVE, begin);
+      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -600,12 +606,14 @@ public class FileIoProvider {
       @Nullable FsVolumeSpi volume, File src, File target,
       boolean preserveFileDate) throws IOException {
     final long length = src.length();
-    final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length);
+    final long begin = profilingEventHook.beforeFileIo(volume, NATIVE_COPY,
+        length);
     try {
+      faultInjectorEventHook.beforeFileIo(volume, NATIVE_COPY, length);
       Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
-      eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
+      profilingEventHook.afterFileIo(volume, NATIVE_COPY, begin, length);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -622,15 +630,16 @@ public class FileIoProvider {
    */
   public boolean mkdirs(
       @Nullable FsVolumeSpi volume, File dir) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, MKDIRS);
     boolean created = false;
     boolean isDirectory;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS);
       created = dir.mkdirs();
       isDirectory = !created && dir.isDirectory();
-      eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+      profilingEventHook.afterMetadataOp(volume, MKDIRS, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
 
@@ -650,13 +659,14 @@ public class FileIoProvider {
    */
   public void mkdirsWithExistsCheck(
       @Nullable FsVolumeSpi volume, File dir) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, MKDIRS);
     boolean succeeded = false;
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS);
       succeeded = dir.isDirectory() || dir.mkdirs();
-      eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+      profilingEventHook.afterMetadataOp(volume, MKDIRS, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
 
@@ -676,13 +686,14 @@ public class FileIoProvider {
    */
   public File[] listFiles(
       @Nullable FsVolumeSpi volume, File dir) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
       File[] children = FileUtil.listFiles(dir);
-      eventHooks.afterMetadataOp(volume, LIST, begin);
+      profilingEventHook.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, LIST, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -698,13 +709,14 @@ public class FileIoProvider {
    */
   public String[] list(
       @Nullable FsVolumeSpi volume, File dir) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
       String[] children = FileUtil.list(dir);
-      eventHooks.afterMetadataOp(volume, LIST, begin);
+      profilingEventHook.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, LIST, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -721,13 +733,14 @@ public class FileIoProvider {
   public List<String> listDirectory(
       @Nullable FsVolumeSpi volume, File dir,
       FilenameFilter filter) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
       List<String> children = IOUtils.listDirectory(dir, filter);
-      eventHooks.afterMetadataOp(volume, LIST, begin);
+      profilingEventHook.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, LIST, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -743,13 +756,14 @@ public class FileIoProvider {
    */
   public int getHardLinkCount(
       @Nullable FsVolumeSpi volume, File f) throws IOException {
-    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
       int count = HardLink.getLinkCount(f);
-      eventHooks.afterMetadataOp(volume, LIST, begin);
+      profilingEventHook.afterMetadataOp(volume, LIST, begin);
       return count;
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, LIST, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -762,13 +776,14 @@ public class FileIoProvider {
    * @return true if the file exists.
    */
   public boolean exists(@Nullable FsVolumeSpi volume, File f) {
-    final long begin = eventHooks.beforeMetadataOp(volume, EXISTS);
+    final long begin = profilingEventHook.beforeMetadataOp(volume, EXISTS);
     try {
+      faultInjectorEventHook.beforeMetadataOp(volume, EXISTS);
       boolean exists = f.exists();
-      eventHooks.afterMetadataOp(volume, EXISTS, begin);
+      profilingEventHook.afterMetadataOp(volume, EXISTS, begin);
       return exists;
     } catch(Exception e) {
-      eventHooks.onFailure(datanode, volume, EXISTS, e, begin);
+      onFailure(volume, begin);
       throw e;
     }
   }
@@ -803,13 +818,14 @@ public class FileIoProvider {
      */
     @Override
     public int read() throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+      final long begin = profilingEventHook.beforeFileIo(volume, READ, 
LEN_INT);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, READ, LEN_INT);
         int b = super.read();
-        eventHooks.afterFileIo(volume, READ, begin, 1);
+        profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT);
         return b;
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, READ, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
@@ -819,13 +835,15 @@ public class FileIoProvider {
      */
     @Override
     public int read(@Nonnull byte[] b) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+      final long begin = profilingEventHook.beforeFileIo(volume, READ, b
+          .length);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, READ, b.length);
         int numBytesRead = super.read(b);
-        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, READ, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
@@ -835,13 +853,14 @@ public class FileIoProvider {
      */
     @Override
     public int read(@Nonnull byte[] b, int off, int len) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, READ, len);
+      final long begin = profilingEventHook.beforeFileIo(volume, READ, len);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, READ, len);
         int numBytesRead = super.read(b, off, len);
-        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, READ, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
@@ -878,12 +897,14 @@ public class FileIoProvider {
      */
     @Override
     public void write(int b) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+      final long begin = profilingEventHook.beforeFileIo(volume, WRITE,
+          LEN_INT);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT);
         super.write(b);
-        eventHooks.afterFileIo(volume, WRITE, begin, 1);
+        profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT);
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
@@ -893,12 +914,14 @@ public class FileIoProvider {
      */
     @Override
     public void write(@Nonnull byte[] b) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, b
+          .length);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length);
         super.write(b);
-        eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+        profilingEventHook.afterFileIo(volume, WRITE, begin, b.length);
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
@@ -908,12 +931,13 @@ public class FileIoProvider {
      */
     @Override
     public void write(@Nonnull byte[] b, int off, int len) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, len);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, WRITE, len);
         super.write(b, off, len);
-        eventHooks.afterFileIo(volume, WRITE, begin, len);
+        profilingEventHook.afterFileIo(volume, WRITE, begin, len);
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
@@ -935,77 +959,93 @@ public class FileIoProvider {
 
     @Override
     public int read() throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+      final long begin = profilingEventHook.beforeFileIo(volume, READ, 
LEN_INT);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, READ, LEN_INT);
         int b = super.read();
-        eventHooks.afterFileIo(volume, READ, begin, 1);
+        profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT);
         return b;
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, READ, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, READ, len);
+      final long begin = profilingEventHook.beforeFileIo(volume, READ, len);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, READ, len);
         int numBytesRead = super.read(b, off, len);
-        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, READ, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
 
     @Override
     public int read(byte[] b) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+      final long begin = profilingEventHook.beforeFileIo(volume, READ, b
+          .length);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, READ, b.length);
         int numBytesRead = super.read(b);
-        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, READ, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
 
     @Override
     public void write(int b) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+      final long begin = profilingEventHook.beforeFileIo(volume, WRITE,
+          LEN_INT);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT);
         super.write(b);
-        eventHooks.afterFileIo(volume, WRITE, begin, 1);
+        profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT);
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
 
     @Override
     public void write(@Nonnull byte[] b) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, b
+          .length);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length);
         super.write(b);
-        eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+        profilingEventHook.afterFileIo(volume, WRITE, begin, b.length);
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-      final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, len);
       try {
+        faultInjectorEventHook.beforeFileIo(volume, WRITE, len);
         super.write(b, off, len);
-        eventHooks.afterFileIo(volume, WRITE, begin, len);
+        profilingEventHook.afterFileIo(volume, WRITE, begin, len);
       } catch(Exception e) {
-        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
+        onFailure(volume, begin);
         throw e;
       }
     }
   }
+
+  private void onFailure(@Nullable FsVolumeSpi volume, long begin) {
+    if (datanode != null && volume != null) {
+      datanode.checkDiskErrorAsync(volume);
+    }
+    profilingEventHook.onFailure(volume, begin);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
index affd093..43ac495 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.Time;
@@ -26,84 +28,96 @@ import org.apache.hadoop.util.Time;
 import javax.annotation.Nullable;
 
 /**
- * {@link FileIoEvents} that profiles the performance of the metadata and data
- * related operations on datanode volumes.
+ * Profiles the performance of the metadata and data related operations on
+ * datanode volumes.
  */
 @InterfaceAudience.Private
-class ProfilingFileIoEvents extends FileIoEvents {
+class ProfilingFileIoEvents {
+
+  private final boolean isEnabled;
+
+  public ProfilingFileIoEvents(@Nullable Configuration conf) {
+    if (conf != null) {
+      isEnabled = conf.getBoolean(DFSConfigKeys
+          .DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY, DFSConfigKeys
+          .DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT);
+    } else {
+      isEnabled = false;
+    }
+  }
 
-  @Override
   public long beforeMetadataOp(@Nullable FsVolumeSpi volume,
       FileIoProvider.OPERATION op) {
-    DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
-    if (metrics != null) {
-      return Time.monotonicNow();
+    if (isEnabled) {
+      DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
+      if (metrics != null) {
+        return Time.monotonicNow();
+      }
     }
     return 0;
   }
 
-  @Override
   public void afterMetadataOp(@Nullable FsVolumeSpi volume,
       FileIoProvider.OPERATION op, long begin) {
-    DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
-    if (metrics != null) {
-      metrics.addMetadastaOperationLatency(Time.monotonicNow() - begin);
+    if (isEnabled) {
+      DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
+      if (metrics != null) {
+        metrics.addMetadastaOperationLatency(Time.monotonicNow() - begin);
+      }
     }
   }
 
-  @Override
   public long beforeFileIo(@Nullable FsVolumeSpi volume,
       FileIoProvider.OPERATION op, long len) {
-    DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
-    if (metrics != null) {
-      return Time.monotonicNow();
+    if (isEnabled) {
+      DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
+      if (metrics != null) {
+        return Time.monotonicNow();
+      }
     }
     return 0;
   }
 
-  @Override
   public void afterFileIo(@Nullable FsVolumeSpi volume,
       FileIoProvider.OPERATION op, long begin, long len) {
-    DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
-    if (metrics != null) {
-      long latency = Time.monotonicNow() - begin;
-      metrics.addDataFileIoLatency(latency);
-      switch (op) {
-      case SYNC:
-        metrics.addSyncIoLatency(latency);
-        break;
-      case FLUSH:
-        metrics.addFlushIoLatency(latency);
-        break;
-      case READ:
-        metrics.addReadIoLatency(latency);
-        break;
-      case WRITE:
-        metrics.addWriteIoLatency(latency);
-        break;
-      default:
+    if (isEnabled) {
+      DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
+      if (metrics != null) {
+        long latency = Time.monotonicNow() - begin;
+        metrics.addDataFileIoLatency(latency);
+        switch (op) {
+        case SYNC:
+          metrics.addSyncIoLatency(latency);
+          break;
+        case FLUSH:
+          metrics.addFlushIoLatency(latency);
+          break;
+        case READ:
+          metrics.addReadIoLatency(latency);
+          break;
+        case WRITE:
+          metrics.addWriteIoLatency(latency);
+          break;
+        default:
+        }
       }
     }
   }
 
-  @Override
-  public void onFailure(@Nullable FsVolumeSpi volume,
-      FileIoProvider.OPERATION op, Exception e, long begin) {
-    DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
-    if (metrics != null) {
-      metrics.addFileIoError(Time.monotonicNow() - begin);
+  public void onFailure(@Nullable FsVolumeSpi volume, long begin) {
+    if (isEnabled) {
+      DataNodeVolumeMetrics metrics = getVolumeMetrics(volume);
+      if (metrics != null) {
+        metrics.addFileIoError(Time.monotonicNow() - begin);
+      }
     }
   }
 
-  @Nullable
-  @Override
-  public String getStatistics() {
-    return null;
-  }
-
   private DataNodeVolumeMetrics getVolumeMetrics(final FsVolumeSpi volume) {
-    if (volume != null) {
-      return volume.getMetrics();
+    if (isEnabled) {
+      if (volume != null) {
+        return volume.getMetrics();
+      }
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeMetrics.java
index 407c3e9..6a8ac9c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeMetrics.java
@@ -121,8 +121,8 @@ public class TestDataNodeVolumeMetrics {
 
   private MiniDFSCluster setupClusterForVolumeMetrics() throws IOException {
     Configuration conf = new HdfsConfiguration();
-    conf.set(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
-        "org.apache.hadoop.hdfs.server.datanode.ProfilingFileIoEvents");
+    conf.setBoolean(DFSConfigKeys
+        .DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY, true);
     SimulatedFSDataset.setFactory(conf);
     return new MiniDFSCluster.Builder(conf)
         .numDataNodes(NUM_DATANODES)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4046794a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index 3bac7b9..51ef5d0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -100,7 +100,9 @@ public class TestHdfsConfigFields extends 
TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY);
     configurationPropsToSkipCompare
-        .add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY);
+        .add(DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY);
+    configurationPropsToSkipCompare.add(DFSConfigKeys
+        .DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY);
 
     // Allocate
     xmlPropsToSkipCompare = new HashSet<String>();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to