MAPREDUCE-6335. Created MR job based performance test driver for the timeline service v2. Contributed by Sangjin Lee.
(cherry picked from commit b689f5d43d3f5434a30fe52f1a7e12e1fc5c71f4) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f0b1cae Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f0b1cae Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f0b1cae Branch: refs/heads/YARN-2928 Commit: 8f0b1cae5e1fd467efa6cc773ff744b27f05a2b3 Parents: 8e58f94 Author: Zhijie Shen <zjs...@apache.org> Authored: Tue Apr 28 19:46:01 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Aug 25 10:47:10 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/TimelineServicePerformanceV2.java | 298 +++++++++++++++++++ .../apache/hadoop/test/MapredTestDriver.java | 3 + 3 files changed, 304 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f0b1cae/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5ac0d3b..2805780 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -9,6 +9,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. (Junping Du via zjshen) + MAPREDUCE-6335. Created MR job based performance test driver for the + timeline service v2. (Sangjin Lee via zjshen) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f0b1cae/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java new file mode 100644 index 0000000..de46617 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Date; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; + +public class TimelineServicePerformanceV2 extends Configured implements Tool { + private static final Log LOG = + LogFactory.getLog(TimelineServicePerformanceV2.class); + + static final int NUM_MAPS_DEFAULT = 1; + + static final int SIMPLE_ENTITY_WRITER = 1; + // constants for mtype = 1 + static final String KBS_SENT = "kbs sent"; + static final int KBS_SENT_DEFAULT = 1; + static final String TEST_TIMES = "testtimes"; + static final int TEST_TIMES_DEFAULT = 100; + static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = + "timeline.server.performance.run.id"; + + static int mapperType = SIMPLE_ENTITY_WRITER; + + protected static int printUsage() { + // TODO is there a way to handle mapper-specific options more gracefully? + System.err.println( + "Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT + + ")\n" + + " [-mtype <mapper type in integer>] \n" + + " 1. simple entity write mapper\n" + + " [-s <(KBs)test>] number of KB per put (default: " + + KBS_SENT_DEFAULT + " KB)\n" + + " [-t] package sending iterations per mapper (default: " + + TEST_TIMES_DEFAULT + ")\n"); + GenericOptionsParser.printGenericCommandUsage(System.err); + return -1; + } + + /** + * Configure a job given argv. + */ + public static boolean parseArgs(String[] args, Job job) throws IOException { + // set the defaults + Configuration conf = job.getConfiguration(); + conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT); + conf.setInt(KBS_SENT, KBS_SENT_DEFAULT); + conf.setInt(TEST_TIMES, TEST_TIMES_DEFAULT); + + for (int i = 0; i < args.length; i++) { + if (args.length == i + 1) { + System.out.println("ERROR: Required parameter missing from " + args[i]); + return printUsage() == 0; + } + try { + if ("-m".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + job.getConfiguration() + .setInt(MRJobConfig.NUM_MAPS, (Integer.parseInt(args[i]))); + } + } else if ("-mtype".equals(args[i])) { + mapperType = Integer.parseInt(args[++i]); + switch (mapperType) { + case SIMPLE_ENTITY_WRITER: + job.setMapperClass(SimpleEntityWriter.class); + break; + default: + job.setMapperClass(SimpleEntityWriter.class); + } + } else if ("-s".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + conf.setInt(KBS_SENT, (Integer.parseInt(args[i]))); + } + } else if ("-t".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + conf.setInt(TEST_TIMES, (Integer.parseInt(args[i]))); + } + } else { + System.out.println("Unexpected argument: " + args[i]); + return printUsage() == 0; + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage() == 0; + } catch (Exception e) { + throw (IOException)new IOException().initCause(e); + } + } + + return true; + } + + /** + * TimelineServer Performance counters + */ + static enum PerfCounters { + TIMELINE_SERVICE_WRITE_TIME, + TIMELINE_SERVICE_WRITE_COUNTER, + TIMELINE_SERVICE_WRITE_FAILURES, + TIMELINE_SERVICE_WRITE_KBS, + } + + public int run(String[] args) throws Exception { + + Job job = Job.getInstance(getConf()); + job.setJarByClass(TimelineServicePerformanceV2.class); + job.setMapperClass(SimpleEntityWriter.class); + job.setInputFormatClass(SleepInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + if (!parseArgs(args, job)) { + return -1; + } + + // for mtype = 1 + // use the current timestamp as the "run id" of the test: this will be used + // as simulating the cluster timestamp for apps + Configuration conf = job.getConfiguration(); + conf.setLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, + System.currentTimeMillis()); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + org.apache.hadoop.mapreduce.Counters counters = job.getCounters(); + long writetime = + counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue(); + long writecounts = + counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue(); + long writesize = + counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue(); + double transacrate = writecounts * 1000 / (double)writetime; + double iorate = writesize * 1000 / (double)writetime; + int numMaps = Integer.parseInt(conf.get(MRJobConfig.NUM_MAPS)); + + System.out.println("TRANSACTION RATE (per mapper): " + transacrate + + " ops/s"); + System.out.println("IO RATE (per mapper): " + iorate + " KB/s"); + + System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps + + " ops/s"); + System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s"); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = + ToolRunner.run(new Configuration(), new TimelineServicePerformanceV2(), + args); + System.exit(res); + } + + /** + * To ensure that the compression really gets exercised, generate a + * random alphanumeric fixed length payload + */ + static final char[] alphaNums = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', + 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', + 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; + + /** + * Adds simple entities with random string payload, events, metrics, and + * configuration. + */ + public static class SimpleEntityWriter + extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> { + public void map(IntWritable key, IntWritable val, Context context) + throws IOException { + + Configuration conf = context.getConfiguration(); + // simulate the app id with the task id + int taskId = context.getTaskAttemptID().getTaskID().getId(); + long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0); + ApplicationId appId = ApplicationId.newInstance(timestamp, taskId); + + // create the app level timeline collector + Configuration tlConf = new YarnConfiguration(); + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + collector.init(tlConf); + collector.start(); + + try { + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(context.getJobName()); + tlContext.setFlowRunId(timestamp); + tlContext.setUserId(context.getUser()); + + final int kbs = Integer.parseInt(conf.get(KBS_SENT)); + + long totalTime = 0; + final int testtimes = Integer.parseInt(conf.get(TEST_TIMES)); + final Random rand = new Random(); + final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); + final char[] payLoad = new char[kbs * 1024]; + + for (int i = 0; i < testtimes; i++) { + // Generate a fixed length random payload + for (int xx = 0; xx < kbs * 1024; xx++) { + int alphaNumIdx = rand.nextInt(alphaNums.length); + payLoad[xx] = alphaNums[alphaNumIdx]; + } + String entId = taskAttemptId + "_" + Integer.toString(i); + final TimelineEntity entity = new TimelineEntity(); + entity.setId(entId); + entity.setType("FOO_ATTEMPT"); + entity.addInfo("PERF_TEST", payLoad); + // add an event + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.addInfo("foo_event", "test"); + entity.addEvent(event); + // add a metric + TimelineMetric metric = new TimelineMetric(); + metric.setId("foo_metric"); + metric.setSingleData(123456789L); + entity.addMetric(metric); + // add a config + entity.addConfig("foo", "bar"); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + collector.putEntities(entities, ugi); + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + e.printStackTrace(); + } + long endWrite = System.nanoTime(); + totalTime += (endWrite-startWrite)/1000000L; + } + LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + + " kB) in " + totalTime + " ms"); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(testtimes); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). + increment(kbs*testtimes); + } finally { + // clean up + collector.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f0b1cae/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java index 8fa82aa..5dcb143 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java @@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.TestMapRed; import org.apache.hadoop.mapred.TestSequenceFileInputFormat; import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; +import org.apache.hadoop.mapred.TimelineServicePerformanceV2; import org.apache.hadoop.mapreduce.FailJob; import org.apache.hadoop.mapreduce.LargeSorter; import org.apache.hadoop.mapreduce.MiniHadoopClusterManager; @@ -90,6 +91,8 @@ public class MapredTestDriver { pgd.addClass("fail", FailJob.class, "a job that always fails"); pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task."); + pgd.addClass("timelineperformance", TimelineServicePerformanceV2.class, + "A job that launch mappers to test timline service performance."); pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode w/ MR."); pgd.addClass("nnbenchWithoutMR", NNBenchWithoutMR.class,