This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 477505c  HDFS-14824. [Dynamometer] Dynamometer in 
org.apache.hadoop.tools does not output the benchmark results. (#1685)
477505c is described below

commit 477505ccfc480f2605a7b65de95ea6f6ff5ce090
Author: Takanobu Asanuma <tasan...@apache.org>
AuthorDate: Sat Nov 2 01:32:32 2019 +0900

    HDFS-14824. [Dynamometer] Dynamometer in org.apache.hadoop.tools does not 
output the benchmark results. (#1685)
---
 .../apache/hadoop/tools/dynamometer/Client.java    |   7 ++
 .../tools/dynamometer/TestDynamometerInfra.java    |   7 +-
 .../workloadgenerator/CreateFileMapper.java        |   3 +-
 .../workloadgenerator/WorkloadDriver.java          |  27 ++---
 .../workloadgenerator/WorkloadMapper.java          |  31 +++---
 .../workloadgenerator/audit/AuditReplayMapper.java |  39 ++++++--
 .../audit/AuditReplayReducer.java                  |  44 ++++++++
 .../workloadgenerator/audit/AuditReplayThread.java |  18 ++++
 .../workloadgenerator/audit/CountTimeWritable.java |  82 +++++++++++++++
 .../workloadgenerator/audit/UserCommandKey.java    | 111 +++++++++++++++++++++
 .../workloadgenerator/TestWorkloadGenerator.java   |  42 ++++++--
 .../src/site/markdown/Dynamometer.md               |   3 +
 12 files changed, 366 insertions(+), 48 deletions(-)

diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java
index 42c1410..36f90b5 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java
@@ -166,6 +166,7 @@ public class Client extends Configured implements Tool {
   public static final String WORKLOAD_REPLAY_ENABLE_ARG =
       "workload_replay_enable";
   public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path";
+  public static final String WORKLOAD_OUTPUT_PATH_ARG = "workload_output_path";
   public static final String WORKLOAD_THREADS_PER_MAPPER_ARG =
       "workload_threads_per_mapper";
   public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay";
@@ -231,6 +232,8 @@ public class Client extends Configured implements Tool {
   private volatile Job workloadJob;
   // The input path for the workload job.
   private String workloadInputPath = "";
+  // The output path for the workload job metric results.
+  private String workloadOutputPath = "";
   // The number of threads to use per mapper for the workload job.
   private int workloadThreadsPerMapper;
   // The startup delay for the workload job.
@@ -347,6 +350,8 @@ public class Client extends Configured implements Tool {
         + "audit logs against the HDFS cluster which is started.");
     opts.addOption(WORKLOAD_INPUT_PATH_ARG, true,
         "Location of the audit traces to replay (Required for workload)");
+    opts.addOption(WORKLOAD_OUTPUT_PATH_ARG, true,
+        "Location of the metrics output (Required for workload)");
     opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads "
         + "per mapper to use to replay the workload. (default "
         + AuditReplayMapper.NUM_THREADS_DEFAULT + ")");
@@ -476,6 +481,7 @@ public class Client extends Configured implements Tool {
       }
       launchWorkloadJob = true;
       workloadInputPath = commandLine.getOptionValue(WORKLOAD_INPUT_PATH_ARG);
+      workloadOutputPath = 
commandLine.getOptionValue(WORKLOAD_OUTPUT_PATH_ARG);
       workloadThreadsPerMapper = Integer
           .parseInt(commandLine.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG,
               String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT)));
@@ -1032,6 +1038,7 @@ public class Client extends Configured implements Tool {
           + workloadStartDelayMs;
       Configuration workloadConf = new Configuration(getConf());
       workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
+      workloadConf.set(AuditReplayMapper.OUTPUT_PATH_KEY, workloadOutputPath);
       workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY,
           workloadThreadsPerMapper);
       workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY,
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java
index b008095..056b7de 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java
@@ -122,7 +122,7 @@ public class TestDynamometerInfra {
   private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path";
   private static final String HADOOP_BIN_VERSION_KEY =
       "dyno.hadoop.bin.version";
-  private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.2";
+  private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.3";
   private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740";
   private static final String VERSION_FILENAME = "VERSION";
 
@@ -132,6 +132,8 @@ public class TestDynamometerInfra {
   private static final String NAMENODE_NODELABEL = "dyno_namenode";
   private static final String DATANODE_NODELABEL = "dyno_datanode";
 
+  private static final String OUTPUT_PATH = "/tmp/trace_output_direct";
+
   private static MiniDFSCluster miniDFSCluster;
   private static MiniYARNCluster miniYARNCluster;
   private static YarnClient yarnClient;
@@ -408,6 +410,7 @@ public class TestDynamometerInfra {
         return false;
       }
     }, 3000, 60000);
+    assertTrue(fs.exists(new Path(OUTPUT_PATH)));
   }
 
   private void assertClusterIsFunctional(Configuration localConf,
@@ -477,6 +480,8 @@ public class TestDynamometerInfra {
             "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
             "-" + Client.WORKLOAD_INPUT_PATH_ARG,
             fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
+            "-" + Client.WORKLOAD_OUTPUT_PATH_ARG,
+            fs.makeQualified(new Path(OUTPUT_PATH)).toString(),
             "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
             "-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
             "-" + AMOptions.NAMENODE_ARGS_ARG,
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java
index 24aec93..33dc81d 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java
@@ -48,7 +48,8 @@ import org.apache.hadoop.mapreduce.Mapper;
  * </ul>
  */
 public class CreateFileMapper
-    extends WorkloadMapper<NullWritable, NullWritable> {
+    extends WorkloadMapper<NullWritable, NullWritable, NullWritable,
+    NullWritable> {
 
   public static final String NUM_MAPPERS_KEY = "createfile.num-mappers";
   public static final String DURATION_MIN_KEY = "createfile.duration-min";
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java
index d34cae7..8b170c1 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java
@@ -32,9 +32,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -124,7 +122,7 @@ public class WorkloadDriver extends Configured implements 
Tool {
       startTimestampMs = tmpConf.getTimeDuration(tmpConfKey, 0,
           TimeUnit.MILLISECONDS) + System.currentTimeMillis();
     }
-    Class<? extends WorkloadMapper<?, ?>> mapperClass = getMapperClass(
+    Class<? extends WorkloadMapper<?, ?, ?, ?>> mapperClass = getMapperClass(
         cli.getOptionValue(MAPPER_CLASS_NAME));
     if (!mapperClass.newInstance().verifyConfigurations(getConf())) {
       System.err
@@ -140,8 +138,9 @@ public class WorkloadDriver extends Configured implements 
Tool {
   }
 
   public static Job getJobForSubmission(Configuration baseConf, String nnURI,
-      long startTimestampMs, Class<? extends WorkloadMapper<?, ?>> mapperClass)
-      throws IOException, InstantiationException, IllegalAccessException {
+      long startTimestampMs, Class<? extends WorkloadMapper<?, ?, ?, ?>>
+      mapperClass) throws IOException, InstantiationException,
+      IllegalAccessException {
     Configuration conf = new Configuration(baseConf);
     conf.set(NN_URI, nnURI);
     conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
@@ -153,16 +152,9 @@ public class WorkloadDriver extends Configured implements 
Tool {
     conf.setLong(START_TIMESTAMP_MS, startTimestampMs);
 
     Job job = Job.getInstance(conf, "Dynamometer Workload Driver");
-    job.setOutputFormatClass(NullOutputFormat.class);
     job.setJarByClass(mapperClass);
     job.setMapperClass(mapperClass);
-    job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf));
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setNumReduceTasks(0);
-    job.setMapOutputKeyClass(NullWritable.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(NullWritable.class);
+    mapperClass.newInstance().configureJob(job);
 
     return job;
   }
@@ -175,8 +167,8 @@ public class WorkloadDriver extends Configured implements 
Tool {
   // The cast is actually checked via isAssignableFrom but the compiler doesn't
   // recognize this
   @SuppressWarnings("unchecked")
-  private Class<? extends WorkloadMapper<?, ?>> getMapperClass(String 
className)
-      throws ClassNotFoundException {
+  private Class<? extends WorkloadMapper<?, ?, ?, ?>> getMapperClass(
+      String className) throws ClassNotFoundException {
     if (!className.contains(".")) {
       className = WorkloadDriver.class.getPackage().getName() + "." + 
className;
     }
@@ -185,13 +177,14 @@ public class WorkloadDriver extends Configured implements 
Tool {
       throw new IllegalArgumentException(className + " is not a subclass of "
           + WorkloadMapper.class.getCanonicalName());
     }
-    return (Class<? extends WorkloadMapper<?, ?>>) mapperClass;
+    return (Class<? extends WorkloadMapper<?, ?, ?, ?>>) mapperClass;
   }
 
   private String getMapperUsageInfo(String mapperClassName)
       throws ClassNotFoundException, InstantiationException,
       IllegalAccessException {
-    WorkloadMapper<?, ?> mapper = 
getMapperClass(mapperClassName).newInstance();
+    WorkloadMapper<?, ?, ?, ?> mapper = getMapperClass(mapperClassName)
+        .newInstance();
     StringBuilder builder = new StringBuilder("Usage for ");
     builder.append(mapper.getClass().getSimpleName());
     builder.append(":\n");
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java
index afe5d45..d73f596 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java
@@ -21,25 +21,18 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 
 /**
  * Represents the base class for a generic workload-generating mapper. By
  * default, it will expect to use {@link VirtualInputFormat} as its
- * {@link InputFormat}. Subclasses expecting a different {@link InputFormat}
- * should override the {@link #getInputFormat(Configuration)} method.
+ * {@link InputFormat}. Subclasses requiring a reducer or expecting a different
+ * {@link InputFormat} should override the {@link #configureJob(Job)} method.
  */
-public abstract class WorkloadMapper<KEYIN, VALUEIN>
-    extends Mapper<KEYIN, VALUEIN, NullWritable, NullWritable> {
-
-  /**
-   * Return the input class to be used by this mapper.
-   * @param conf configuration.
-   * @return the {@link InputFormat} implementation for the mapper.
-   */
-  public Class<? extends InputFormat> getInputFormat(Configuration conf) {
-    return VirtualInputFormat.class;
-  }
+public abstract class WorkloadMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
+    Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
   /**
    * Get the description of the behavior of this mapper.
@@ -62,4 +55,16 @@ public abstract class WorkloadMapper<KEYIN, VALUEIN>
    */
   public abstract boolean verifyConfigurations(Configuration conf);
 
+  /**
+   * Setup input and output formats and optional reducer.
+   */
+  public void configureJob(Job job) {
+    job.setInputFormatClass(VirtualInputFormat.class);
+
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
 }
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
index 27beda1..4dad215 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
@@ -20,6 +20,10 @@ package 
org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
 import com.google.common.collect.Lists;
 import java.util.Optional;
 import java.util.function.Function;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadDriver;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper;
 import java.io.IOException;
@@ -35,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -73,9 +76,11 @@ import static 
org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditR
  * are replayed. For example, a rate factor of 2 would make the replay occur
  * twice as fast, and a rate factor of 0.5 would make it occur half as fast.
  */
-public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
+public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text,
+    UserCommandKey, CountTimeWritable> {
 
   public static final String INPUT_PATH_KEY = "auditreplay.input-path";
+  public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
   public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
   public static final int NUM_THREADS_DEFAULT = 1;
   public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
@@ -171,11 +176,6 @@ public class AuditReplayMapper extends 
WorkloadMapper<LongWritable, Text> {
   private ScheduledThreadPoolExecutor progressExecutor;
 
   @Override
-  public Class<? extends InputFormat> getInputFormat(Configuration conf) {
-    return NoSplitTextInputFormat.class;
-  }
-
-  @Override
   public String getDescription() {
     return "This mapper replays audit log files.";
   }
@@ -185,6 +185,7 @@ public class AuditReplayMapper extends 
WorkloadMapper<LongWritable, Text> {
     return Lists.newArrayList(
         INPUT_PATH_KEY
             + " (required): Path to directory containing input files.",
+        OUTPUT_PATH_KEY + " (required): Path to destination for output files.",
         NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT
             + "): Number of threads to use per mapper for replay.",
         CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT
@@ -199,7 +200,8 @@ public class AuditReplayMapper extends 
WorkloadMapper<LongWritable, Text> {
 
   @Override
   public boolean verifyConfigurations(Configuration conf) {
-    return conf.get(INPUT_PATH_KEY) != null;
+    return conf.get(INPUT_PATH_KEY) != null
+        && conf.get(OUTPUT_PATH_KEY) != null;
   }
 
   @Override
@@ -256,7 +258,8 @@ public class AuditReplayMapper extends 
WorkloadMapper<LongWritable, Text> {
   }
 
   @Override
-  public void cleanup(Mapper.Context context) throws InterruptedException {
+  public void cleanup(Mapper.Context context)
+      throws InterruptedException, IOException {
     for (AuditReplayThread t : threads) {
       // Add in an indicator for each thread to shut down after the last real
       // command
@@ -266,6 +269,7 @@ public class AuditReplayMapper extends 
WorkloadMapper<LongWritable, Text> {
     for (AuditReplayThread t : threads) {
       t.join();
       t.drainCounters(context);
+      t.drainCommandLatencies(context);
       if (t.getException() != null) {
         threadException = Optional.of(t.getException());
       }
@@ -287,4 +291,21 @@ public class AuditReplayMapper extends 
WorkloadMapper<LongWritable, Text> {
       LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps);
     }
   }
+
+  @Override
+  public void configureJob(Job job) {
+    job.setMapOutputKeyClass(UserCommandKey.class);
+    job.setMapOutputValueClass(CountTimeWritable.class);
+    job.setInputFormatClass(NoSplitTextInputFormat.class);
+
+    job.setNumReduceTasks(1);
+    job.setReducerClass(AuditReplayReducer.class);
+    job.setOutputKeyClass(UserCommandKey.class);
+    job.setOutputValueClass(CountTimeWritable.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    TextOutputFormat.setOutputPath(job, new Path(
+        job.getConfiguration().get(OUTPUT_PATH_KEY)));
+    job.getConfiguration().set(TextOutputFormat.SEPARATOR, ",");
+  }
 }
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java
new file mode 100644
index 0000000..cde1630
--- /dev/null
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+/**
+ * AuditReplayReducer aggregates the returned latency values from
+ * {@link AuditReplayMapper} and sums them up by {@link UserCommandKey}, which
+ * combines the user's id that ran the command and the type of the command
+ * (READ/WRITE).
+ */
+public class AuditReplayReducer extends Reducer<UserCommandKey,
+    CountTimeWritable, UserCommandKey, CountTimeWritable> {
+
+  @Override
+  protected void reduce(UserCommandKey key, Iterable<CountTimeWritable> values,
+      Context context) throws IOException, InterruptedException {
+    long countSum = 0;
+    long timeSum = 0;
+    for (CountTimeWritable v : values) {
+      countSum += v.getCount();
+      timeSum += v.getTime();
+    }
+    context.write(key, new CountTimeWritable(countSum, timeSum));
+  }
+}
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java
index e63c7a3..274c5a7 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java
@@ -76,6 +76,8 @@ public class AuditReplayThread extends Thread {
   // and merge them all together at the end.
   private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
   private Map<String, Counter> individualCommandsMap = new HashMap<>();
+  private Map<UserCommandKey, CountTimeWritable> commandLatencyMap
+      = new HashMap<>();
 
   AuditReplayThread(Mapper.Context mapperContext,
       DelayQueue<AuditReplayCommand> queue,
@@ -123,6 +125,14 @@ public class AuditReplayThread extends Thread {
     }
   }
 
+  void drainCommandLatencies(Mapper.Context context)
+      throws InterruptedException, IOException {
+    for (Map.Entry<UserCommandKey, CountTimeWritable> ent
+        : commandLatencyMap.entrySet()) {
+      context.write(ent.getKey(), ent.getValue());
+    }
+  }
+
   /**
    * Add a command to this thread's processing queue.
    *
@@ -279,6 +289,14 @@ public class AuditReplayThread extends Thread {
         throw new RuntimeException("Unexpected command: " + replayCommand);
       }
       long latency = System.currentTimeMillis() - startTime;
+
+      UserCommandKey userCommandKey = new 
UserCommandKey(command.getSimpleUgi(),
+          replayCommand.toString(), replayCommand.getType().toString());
+      commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable());
+      CountTimeWritable latencyWritable = 
commandLatencyMap.get(userCommandKey);
+      latencyWritable.setCount(latencyWritable.getCount() + 1);
+      latencyWritable.setTime(latencyWritable.getTime() + latency);
+
       switch (replayCommand.getType()) {
       case WRITE:
         replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY)
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java
new file mode 100644
index 0000000..6b851c8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * UserCommandKey is a {@link Writable} used as a composite value that
+ * accumulates the count and cumulative latency of replayed commands. It is
+ * used as the output value for AuditReplayMapper and AuditReplayReducer.
+ */
+public class CountTimeWritable implements Writable {
+  private LongWritable count;
+  private LongWritable time;
+
+  public CountTimeWritable() {
+    count = new LongWritable();
+    time = new LongWritable();
+  }
+
+  public CountTimeWritable(LongWritable count, LongWritable time) {
+    this.count = count;
+    this.time = time;
+  }
+
+  public CountTimeWritable(long count, long time) {
+    this.count = new LongWritable(count);
+    this.time = new LongWritable(time);
+  }
+
+  public long getCount() {
+    return count.get();
+  }
+
+  public long getTime() {
+    return time.get();
+  }
+
+  public void setCount(long count) {
+    this.count.set(count);
+  }
+
+  public void setTime(long time) {
+    this.time.set(time);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    count.write(out);
+    time.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    count.readFields(in);
+    time.readFields(in);
+  }
+
+  @Override
+  public String toString() {
+    return getCount() + "," + getTime();
+  }
+}
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java
new file mode 100644
index 0000000..5cfe09f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+import javax.annotation.Nonnull;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * UserCommandKey is a {@link org.apache.hadoop.io.Writable} used as a 
composite
+ * key combining the user id, name, and type of a replayed command. It is used
+ * as the output key for AuditReplayMapper and the keys for AuditReplayReducer.
+ */
+public class UserCommandKey implements WritableComparable {
+  private Text user;
+  private Text command;
+  private Text type;
+
+  public UserCommandKey() {
+    user = new Text();
+    command = new Text();
+    type = new Text();
+  }
+
+  public UserCommandKey(Text user, Text command, Text type) {
+    this.user = user;
+    this.command = command;
+    this.type = type;
+  }
+
+  public UserCommandKey(String user, String command, String type) {
+    this.user = new Text(user);
+    this.command = new Text(command);
+    this.type = new Text(type);
+  }
+
+  public String getUser() {
+    return user.toString();
+  }
+
+  public String getCommand() {
+    return command.toString();
+  }
+
+  public String getType() {
+    return type.toString();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    user.write(out);
+    command.write(out);
+    type.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    user.readFields(in);
+    command.readFields(in);
+    type.readFields(in);
+  }
+
+  @Override
+  public int compareTo(@Nonnull Object o) {
+    return toString().compareTo(o.toString());
+  }
+
+  @Override
+  public String toString() {
+    return getUser() + "," + getType() + "," + getCommand();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UserCommandKey that = (UserCommandKey) o;
+    return getUser().equals(that.getUser()) &&
+        getCommand().equals(that.getCommand()) &&
+        getType().equals(that.getType());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getUser(), getCommand(), getType());
+  }
+}
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java
index 5b2a2e7..0162352 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.tools.dynamometer.workloadgenerator;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import 
org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditCommandParser;
 import 
org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
 import 
org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser;
 import 
org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,9 +38,12 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ImpersonationProvider;
+import org.jline.utils.Log;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS;
 import static org.junit.Assert.assertEquals;
@@ -46,6 +53,8 @@ import static org.junit.Assert.assertTrue;
 
 /** Tests for {@link WorkloadDriver} and related classes. */
 public class TestWorkloadGenerator {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestWorkloadGenerator.class);
 
   private Configuration conf;
   private MiniDFSCluster miniCluster;
@@ -73,22 +82,27 @@ public class TestWorkloadGenerator {
   }
 
   @Test
-  public void testAuditWorkloadDirectParser() throws Exception {
+  public void testAuditWorkloadDirectParserWithOutput() throws Exception {
     String workloadInputPath = TestWorkloadGenerator.class.getClassLoader()
         .getResource("audit_trace_direct").toString();
+    String auditOutputPath = "/tmp/trace_output_direct";
     conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
+    conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath);
     conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60 * 1000);
-    testAuditWorkload();
+    testAuditWorkloadWithOutput(auditOutputPath);
   }
 
   @Test
-  public void testAuditWorkloadHiveParser() throws Exception {
-    String workloadInputPath = TestWorkloadGenerator.class.getClassLoader()
-        .getResource("audit_trace_hive").toString();
+  public void testAuditWorkloadHiveParserWithOutput() throws Exception {
+    String workloadInputPath =
+        TestWorkloadGenerator.class.getClassLoader()
+            .getResource("audit_trace_hive").toString();
+    String auditOutputPath = "/tmp/trace_output_hive";
     conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
+    conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath);
     conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY,
         AuditLogHiveTableParser.class, AuditCommandParser.class);
-    testAuditWorkload();
+    testAuditWorkloadWithOutput(auditOutputPath);
   }
 
   /**
@@ -114,7 +128,8 @@ public class TestWorkloadGenerator {
     }
   }
 
-  private void testAuditWorkload() throws Exception {
+  private void testAuditWorkloadWithOutput(String auditOutputPath)
+      throws Exception {
     long workloadStartTime = System.currentTimeMillis() + 10000;
     Job workloadJob = WorkloadDriver.getJobForSubmission(conf,
         dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class);
@@ -132,5 +147,18 @@ public class TestWorkloadGenerator {
     assertTrue(
         dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory());
     assertFalse(dfs.exists(new Path("/denied")));
+
+    assertTrue(dfs.exists(new Path(auditOutputPath)));
+    try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath,
+        "part-r-00000"))) {
+      String auditOutput = IOUtils.toString(auditOutputFile,
+          StandardCharsets.UTF_8);
+      Log.info(auditOutput);
+      assertTrue(auditOutput.matches(
+          ".*(hdfs,WRITE,[A-Z]+,[13]+,[0-9]+\\n){3}.*"));
+      // Matches three lines of the format "hdfs,WRITE,name,count,time"
+      // Using [13] for the count group because each operation is run either
+      // 1 or 3 times but the output order isn't guaranteed
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md 
b/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md
index fee569a..e6b3136 100644
--- a/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md
+++ b/hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md
@@ -232,6 +232,9 @@ within that file. A best effort is made to faithfully 
replay the audit log event
 originally occurred (optionally, this can be adjusted by specifying 
`auditreplay.rate-factor` which is a multiplicative
 factor towards the rate of replay, e.g. use 2.0 to replay the events at twice 
the original speed).
 
+The AuditReplayMapper will output the benchmark results to a file 
`part-r-00000` in the output directory in CSV format.
+Each line is in the format `user,type,operation,numops,cumulativelatency`, 
e.g. `hdfs,WRITE,MKDIRS,2,150`.
+
 ### Integrated Workload Launch
 
 To have the infrastructure application client launch the workload 
automatically, parameters for the workload job


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to