Repository: hadoop Updated Branches: refs/heads/trunk 80a68d605 -> 2ac87df57
MAPREDUCE-6376. Add avro binary support for jhist files. Contributed by Ray Chiang Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ac87df5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ac87df5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ac87df5 Branch: refs/heads/trunk Commit: 2ac87df578accb6e612f70ded76271cb5082ee10 Parents: 80a68d6 Author: Jason Lowe <jl...@apache.org> Authored: Wed Jul 1 16:00:03 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Wed Jul 1 16:00:03 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../jobhistory/JobHistoryEventHandler.java | 19 +++++++++++-- .../hadoop/mapreduce/jobhistory/TestEvents.java | 3 +- .../mapreduce/v2/jobhistory/JHAdminConfig.java | 7 +++++ .../mapreduce/jobhistory/EventReader.java | 12 ++++---- .../mapreduce/jobhistory/EventWriter.java | 29 ++++++++++++++++---- .../src/main/resources/mapred-default.xml | 9 ++++++ 7 files changed, 69 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0baecf8..409f074 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -372,6 +372,9 @@ Release 2.8.0 - UNRELEASED OPTIMIZATIONS + MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via + jlowe) + BUG FIXES MAPREDUCE-6314. TestPipeApplication fails on trunk. http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 35556a6..0457cc5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -105,7 +105,8 @@ public class JobHistoryEventHandler extends AbstractService private int numUnflushedCompletionEvents = 0; private boolean isTimerActive; - + private EventWriter.WriteMode jhistMode = + EventWriter.WriteMode.JSON; protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>(); @@ -260,6 +261,20 @@ public class JobHistoryEventHandler extends AbstractService LOG.info("Emitting job history data to the timeline server is not enabled"); } + // Flag for setting + String jhistFormat = conf.get(JHAdminConfig.MR_HS_JHIST_FORMAT, + JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT); + if (jhistFormat.equals("json")) { + jhistMode = EventWriter.WriteMode.JSON; + } else if (jhistFormat.equals("binary")) { + jhistMode = EventWriter.WriteMode.BINARY; + } else { + LOG.warn("Unrecognized value '" + jhistFormat + "' for property " + + JHAdminConfig.MR_HS_JHIST_FORMAT + ". Valid values are " + + "'json' or 'binary'. Falling back to default value '" + + JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'."); + } + super.serviceInit(conf); } @@ -418,7 +433,7 @@ public class JobHistoryEventHandler extends AbstractService protected EventWriter createEventWriter(Path historyFilePath) throws IOException { FSDataOutputStream out = stagingDirFS.create(historyFilePath, true); - return new EventWriter(out); + return new EventWriter(out, this.jhistMode); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index 597f7a0..7612ceb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -190,7 +190,8 @@ public class TestEvents { ByteArrayOutputStream output = new ByteArrayOutputStream(); FSDataOutputStream fsOutput = new FSDataOutputStream(output, new FileSystem.Statistics("scheme")); - EventWriter writer = new EventWriter(fsOutput); + EventWriter writer = new EventWriter(fsOutput, + EventWriter.WriteMode.JSON); writer.write(getJobPriorityChangedEvent()); writer.write(getJobStatusChangedEvent()); writer.write(getTaskUpdatedEvent()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index a97c2ca..86dfad3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -221,4 +221,11 @@ public class JHAdminConfig { + "jobname.limit"; public static final int DEFAULT_MR_HS_JOBNAME_LIMIT = 50; + /** + * Settings for .jhist file format. + */ + public static final String MR_HS_JHIST_FORMAT = + MR_HISTORY_PREFIX + "jhist.format"; + public static final String DEFAULT_MR_HS_JHIST_FORMAT = + "json"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java index e08a929..9898c2d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java @@ -66,16 +66,18 @@ public class EventReader implements Closeable { public EventReader(DataInputStream in) throws IOException { this.in = in; this.version = in.readLine(); - - if (!EventWriter.VERSION.equals(version)) { - throw new IOException("Incompatible event log version: "+version); - } Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class); Schema.Parser parser = new Schema.Parser(); this.schema = parser.parse(in.readLine()); this.reader = new SpecificDatumReader(schema, myschema); - this.decoder = DecoderFactory.get().jsonDecoder(schema, in); + if (EventWriter.VERSION.equals(version)) { + this.decoder = DecoderFactory.get().jsonDecoder(schema, in); + } else if (EventWriter.VERSION_BINARY.equals(version)) { + this.decoder = DecoderFactory.get().binaryDecoder(in, null); + } else { + throw new IOException("Incompatible event log version: " + version); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java index a548dfe..29489a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java @@ -43,20 +43,37 @@ import org.apache.hadoop.mapreduce.Counters; */ class EventWriter { static final String VERSION = "Avro-Json"; + static final String VERSION_BINARY = "Avro-Binary"; private FSDataOutputStream out; private DatumWriter<Event> writer = new SpecificDatumWriter<Event>(Event.class); private Encoder encoder; private static final Log LOG = LogFactory.getLog(EventWriter.class); - - EventWriter(FSDataOutputStream out) throws IOException { + public enum WriteMode { JSON, BINARY } + private final WriteMode writeMode; + private final boolean jsonOutput; // Cache value while we have 2 modes + + EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException { this.out = out; - out.writeBytes(VERSION); + this.writeMode = mode; + if (this.writeMode==WriteMode.JSON) { + this.jsonOutput = true; + out.writeBytes(VERSION); + } else if (this.writeMode==WriteMode.BINARY) { + this.jsonOutput = false; + out.writeBytes(VERSION_BINARY); + } else { + throw new IOException("Unknown mode: " + mode); + } out.writeBytes("\n"); out.writeBytes(Event.SCHEMA$.toString()); out.writeBytes("\n"); - this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out); + if (!this.jsonOutput) { + this.encoder = EncoderFactory.get().binaryEncoder(out, null); + } else { + this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out); + } } synchronized void write(HistoryEvent event) throws IOException { @@ -65,7 +82,9 @@ class EventWriter { wrapper.setEvent(event.getDatum()); writer.write(wrapper, encoder); encoder.flush(); - out.writeBytes("\n"); + if (this.jsonOutput) { + out.writeBytes("\n"); + } } void flush() throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ac87df5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index ba63c02..ddcd2df 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1714,6 +1714,15 @@ </property> <property> + <description> + File format the AM will use when generating the .jhist file. Valid + values are "json" for text output and "binary" for faster parsing. + </description> + <name>mapreduce.jobhistory.jhist.format</name> + <value>json</value> +</property> + +<property> <name>mapreduce.job.heap.memory-mb.ratio</name> <value>0.8</value> <description>The ratio of heap-size to container-size. If no -Xmx is