[EAGLE-461] Convert MR history app with new app framework https://issues.apache.org/jira/browse/EAGLE-461
Author: Hao Chen <h...@apache.org> Closes #380 from haoch/EAGLE-461. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0bde482b Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0bde482b Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0bde482b Branch: refs/heads/master Commit: 0bde482be40f208ba944d32ece23533714c87133 Parents: b52405f Author: Hao Chen <h...@apache.org> Authored: Wed Aug 24 15:48:58 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Wed Aug 24 15:48:58 2016 +0800 ---------------------------------------------------------------------- eagle-jpm/eagle-jpm-mr-history/pom.xml | 5 + .../jpm/mr/history/MRHistoryJobApplication.java | 75 +++ .../MRHistoryJobApplicationProvider.java | 26 + .../jpm/mr/history/MRHistoryJobConfig.java | 208 +++++++ .../eagle/jpm/mr/history/MRHistoryJobMain.java | 70 +-- .../jpm/mr/history/common/JHFConfigManager.java | 182 ------ .../crawler/DefaultJHFInputStreamCallback.java | 8 +- .../history/crawler/JHFCrawlerDriverImpl.java | 6 +- .../mr/history/crawler/JobHistoryDAOImpl.java | 2 +- .../HistoryJobEntityCreationListener.java | 6 +- .../HistoryJobEntityLifecycleListener.java | 5 +- .../jpm/mr/history/parser/ImportException.java | 2 - .../mr/history/parser/JHFEventReaderBase.java | 302 +++++----- .../mr/history/parser/JHFMRVer1EventReader.java | 30 +- .../jpm/mr/history/parser/JHFMRVer1Parser.java | 319 ++++++----- .../parser/JHFMRVer1PerLineListener.java | 14 +- .../mr/history/parser/JHFMRVer2EventReader.java | 553 +++++++++++++------ .../jpm/mr/history/parser/JHFMRVer2Parser.java | 55 +- .../jpm/mr/history/parser/JHFParserBase.java | 3 +- .../jpm/mr/history/parser/JHFParserFactory.java | 44 +- .../parser/JHFWriteNotCompletedException.java | 6 +- ...JobConfigurationCreationServiceListener.java | 24 +- .../JobEntityCreationEagleServiceListener.java | 45 +- .../parser/JobEntityCreationPublisher.java | 9 +- .../parser/JobEntityLifecycleAggregator.java | 37 +- .../mr/history/parser/MRErrorClassifier.java | 17 +- .../jpm/mr/history/parser/RecordTypes.java | 5 +- .../parser/TaskAttemptCounterListener.java | 54 +- .../mr/history/parser/TaskFailureListener.java | 142 ++--- .../jpm/mr/history/storm/JobHistorySpout.java | 122 ++-- .../mr/history/zkres/JobHistoryZKStateLCM.java | 8 + .../history/zkres/JobHistoryZKStateManager.java | 90 +-- ....history.MRHistoryJobApplicationProvider.xml | 154 ++++++ ...org.apache.eagle.app.spi.ApplicationProvider | 16 + .../src/main/resources/application.conf | 20 +- .../MRHistoryJobApplicationProviderTest.java | 33 ++ .../mr/history/MRHistoryJobApplicationTest.java | 27 + 37 files changed, 1633 insertions(+), 1091 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml index bfd4cf2..1ffda6a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/pom.xml +++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml @@ -104,6 +104,11 @@ <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-base</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java new file mode 100644 index 0000000..08607a1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history; + +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; +import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; +import org.apache.eagle.jpm.util.Constants; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import com.typesafe.config.Config; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +public class MRHistoryJobApplication extends StormApplication { + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + //1. trigger init conf + MRHistoryJobConfig appConfig = MRHistoryJobConfig.getInstance(config); + com.typesafe.config.Config jhfAppConf = appConfig.getConfig(); + + //2. init JobHistoryContentFilter + final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile(); + String[] confKeyPatternsSplit = jhfAppConf.getString("MRConfigureKeys.jobConfigKey").split(","); + List<String> confKeyPatterns = new ArrayList<>(confKeyPatternsSplit.length); + for (String confKeyPattern : confKeyPatternsSplit) { + confKeyPatterns.add(confKeyPattern.trim()); + } + confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB); + confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB); + confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB); + confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB); + + String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey"); + builder.setJobNameKey(jobNameKey); + + for (String key : confKeyPatterns) { + builder.includeJobKeyPatterns(Pattern.compile(key)); + } + JobHistoryContentFilter filter = builder.build(); + //3. init topology + TopologyBuilder topologyBuilder = new TopologyBuilder(); + String spoutName = "mrHistoryJobExecutor"; + int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName); + int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName); + if (parallelism > tasks) { + parallelism = tasks; + } + topologyBuilder.setSpout( + spoutName, + new JobHistorySpout(filter, appConfig), + parallelism + ).setNumTasks(tasks); + return topologyBuilder.createTopology(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java new file mode 100644 index 0000000..9aa1c61 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history; + +import org.apache.eagle.app.spi.AbstractApplicationProvider; + +public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider<MRHistoryJobApplication> { + @Override + public MRHistoryJobApplication getApplication() { + return new MRHistoryJobApplication(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java new file mode 100644 index 0000000..ae86904 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history; + +import org.apache.eagle.common.config.ConfigOptionParser; +import org.apache.eagle.jpm.util.DefaultJobIdPartitioner; +import org.apache.eagle.jpm.util.JobIdPartitioner; + +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class MRHistoryJobConfig implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobConfig.class); + + private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf"; + + public String getEnv() { + return env; + } + + private String env; + + public ZKStateConfig getZkStateConfig() { + return zkStateConfig; + } + + private ZKStateConfig zkStateConfig; + + public JobHistoryEndpointConfig getJobHistoryEndpointConfig() { + return jobHistoryEndpointConfig; + } + + private JobHistoryEndpointConfig jobHistoryEndpointConfig; + + public ControlConfig getControlConfig() { + return controlConfig; + } + + private ControlConfig controlConfig; + + public JobExtractorConfig getJobExtractorConfig() { + return jobExtractorConfig; + } + + private JobExtractorConfig jobExtractorConfig; + + public EagleServiceConfig getEagleServiceConfig() { + return eagleServiceConfig; + } + + private EagleServiceConfig eagleServiceConfig; + + public Config getConfig() { + return config; + } + + private Config config; + + public static class ZKStateConfig implements Serializable { + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; + public String zkPort; + } + + public static class JobHistoryEndpointConfig implements Serializable { + public String nnEndpoint; + public String basePath; + public boolean pathContainsJobTrackerName; + public String jobTrackerName; + public String principal; + public String keyTab; + } + + public static class ControlConfig implements Serializable { + public boolean dryRun; + public Class<? extends JobIdPartitioner> partitionerCls; + public boolean zeroBasedMonth; + public String timeZone; + } + + public static class JobExtractorConfig implements Serializable { + public String site; + public String mrVersion; + public int readTimeoutSeconds; + } + + public static class EagleServiceConfig implements Serializable { + public String eagleServiceHost; + public int eagleServicePort; + public String username; + public String password; + } + + private static MRHistoryJobConfig manager = new MRHistoryJobConfig(); + + /** + * As this is singleton object and constructed while this class is being initialized, + * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError. + * And this is unrecoverable and hard to troubleshooting. + */ + private MRHistoryJobConfig() { + this.zkStateConfig = new ZKStateConfig(); + this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig(); + this.controlConfig = new ControlConfig(); + this.jobExtractorConfig = new JobExtractorConfig(); + this.eagleServiceConfig = new EagleServiceConfig(); + } + + public static MRHistoryJobConfig getInstance(String[] args) { + manager.init(args); + return manager; + } + + public static MRHistoryJobConfig getInstance(Config config) { + manager.init(config); + return manager; + } + + /** + * read configuration file and load hbase config etc. + */ + private void init(String[] args) { + // TODO: Probably we can remove the properties file path check in future + try { + LOG.info("Loading from configuration file"); + init(new ConfigOptionParser().load(args)); + } catch (Exception e) { + LOG.error("failed to load config"); + } + } + + /** + * read configuration file and load hbase config etc. + */ + private void init(Config config) { + this.config = config; + this.env = config.getString("envContextConfig.env"); + //parse eagle job extractor + this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site"); + this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion"); + this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds"); + //parse eagle zk + this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); + this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort"); + this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); + this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); + this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); + this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot"); + + //parse job history endpoint + this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath"); + this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName"); + this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint"); + this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName"); + this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal"); + this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab"); + + //parse control config + this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun"); + try { + this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls")); + assert this.controlConfig.partitionerCls != null; + } catch (Exception e) { + LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e); + this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class; + } finally { + LOG.info("Loaded partitioner class: {}", this.controlConfig.partitionerCls); + } + this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth"); + this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone"); + + // parse eagle service endpoint + this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host"); + String port = config.getString("eagleProps.eagleService.port"); + this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port)); + this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username"); + this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password"); + + LOG.info("Successfully initialized MRHistoryJobConfig"); + LOG.info("env: " + this.env); + LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum); + LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort); + LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); + LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java index 9f030a7..bef72cc 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java @@ -18,74 +18,8 @@ package org.apache.eagle.jpm.mr.history; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.topology.TopologyBuilder; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; -import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; -import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; -import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; -import org.apache.eagle.jpm.util.Constants; - -import java.util.List; -import java.util.regex.Pattern; - public class MRHistoryJobMain { public static void main(String []args) { - try { - //1. trigger init conf - JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args); - com.typesafe.config.Config jhfAppConf = jhfConfigManager.getConfig(); - - //2. init JobHistoryContentFilter - JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile(); - List<String> confKeyPatterns = jhfAppConf.getStringList("MRConfigureKeys.jobConfigKey"); - confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB); - confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB); - confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB); - confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB); - - String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey"); - builder.setJobNameKey(jobNameKey); - - for (String key : confKeyPatterns) { - builder.includeJobKeyPatterns(Pattern.compile(key)); - } - JobHistoryContentFilter filter = builder.build(); - - //3. init topology - TopologyBuilder topologyBuilder = new TopologyBuilder(); - String topologyName = "mrHistoryJobTopology"; - if (jhfAppConf.hasPath("envContextConfig.topologyName")) { - topologyName = jhfAppConf.getString("envContextConfig.topologyName"); - } - String spoutName = "mrHistoryJobExecutor"; - int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName); - int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName); - if (parallelism > tasks) { - parallelism = tasks; - } - topologyBuilder.setSpout( - spoutName, - new JobHistorySpout(filter, jhfConfigManager), - parallelism - ).setNumTasks(tasks); - - Config config = new backtype.storm.Config(); - config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers")); - config.put(Config.TOPOLOGY_DEBUG, true); - if (!jhfConfigManager.getEnv().equals("local")) { - //cluster mode - //parse conf here - StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology()); - } else { - //local mode - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topologyName, config, topologyBuilder.createTopology()); - } - } catch (Exception e) { - e.printStackTrace(); - } + new MRHistoryJobApplication().run(args); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java deleted file mode 100644 index c99891b..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.common; - -import com.typesafe.config.Config; -import org.apache.eagle.common.config.ConfigOptionParser; -import org.apache.eagle.jpm.util.DefaultJobIdPartitioner; -import org.apache.eagle.jpm.util.JobIdPartitioner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; - -public class JHFConfigManager implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(JHFConfigManager.class); - - private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf"; - - public String getEnv() { - return env; - } - private String env; - - public ZKStateConfig getZkStateConfig() { return zkStateConfig; } - private ZKStateConfig zkStateConfig; - - public JobHistoryEndpointConfig getJobHistoryEndpointConfig() { return jobHistoryEndpointConfig; } - private JobHistoryEndpointConfig jobHistoryEndpointConfig; - - public ControlConfig getControlConfig() { return controlConfig; } - private ControlConfig controlConfig; - - public JobExtractorConfig getJobExtractorConfig() { return jobExtractorConfig; } - private JobExtractorConfig jobExtractorConfig; - - public EagleServiceConfig getEagleServiceConfig() { - return eagleServiceConfig; - } - private EagleServiceConfig eagleServiceConfig; - - public Config getConfig() { - return config; - } - private Config config; - - public static class ZKStateConfig implements Serializable { - public String zkQuorum; - public String zkRoot; - public int zkSessionTimeoutMs; - public int zkRetryTimes; - public int zkRetryInterval; - public String zkPort; - } - - public static class JobHistoryEndpointConfig implements Serializable { - public String nnEndpoint; - public String basePath; - public boolean pathContainsJobTrackerName; - public String jobTrackerName; - public String principal; - public String keyTab; - } - - public static class ControlConfig implements Serializable { - public boolean dryRun; - public Class<? extends JobIdPartitioner> partitionerCls; - public boolean zeroBasedMonth; - public String timeZone; - } - - public static class JobExtractorConfig implements Serializable { - public String site; - public String mrVersion; - public int readTimeoutSeconds; - } - - public static class EagleServiceConfig implements Serializable { - public String eagleServiceHost; - public int eagleServicePort; - public String username; - public String password; - } - - private static JHFConfigManager manager = new JHFConfigManager(); - - /** - * As this is singleton object and constructed while this class is being initialized, - * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError. - * And this is unrecoverable and hard to troubleshooting. - */ - private JHFConfigManager() { - this.zkStateConfig = new ZKStateConfig(); - this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig(); - this.controlConfig = new ControlConfig(); - this.jobExtractorConfig = new JobExtractorConfig(); - this.eagleServiceConfig = new EagleServiceConfig(); - } - - public static JHFConfigManager getInstance(String []args) { - manager.init(args); - return manager; - } - - /** - * read configuration file and load hbase config etc - */ - private void init(String[] args) { - // TODO: Probably we can remove the properties file path check in future - try { - LOG.info("Loading from configuration file"); - this.config = new ConfigOptionParser().load(args); - } catch (Exception e) { - LOG.error("failed to load config"); - } - - this.env = config.getString("envContextConfig.env"); - - //parse eagle job extractor - this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site"); - this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion"); - this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds"); - //parse eagle zk - this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); - this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort"); - this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); - this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); - this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); - this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot"); - - //parse job history endpoint - this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath"); - this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName"); - this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint"); - this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName"); - this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal"); - this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab"); - - //parse control config - this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun"); - try { - this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls")); - assert this.controlConfig.partitionerCls != null; - } catch (Exception e) { - LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e); - this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class; - } finally { - LOG.info("Loaded partitioner class: {}",this.controlConfig.partitionerCls); - } - this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth"); - this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone"); - - // parse eagle service endpoint - this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host"); - String port = config.getString("eagleProps.eagleService.port"); - this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port)); - this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username"); - this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password"); - - LOG.info("Successfully initialized JHFConfigManager"); - LOG.info("env: " + this.env); - LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum); - LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort); - LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); - LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index 6f85149..aeb35fd 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.history.crawler; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.parser.JHFParserBase; import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory; import org.slf4j.Logger; @@ -33,16 +33,16 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { private JobHistoryContentFilter m_filter; - private JHFConfigManager m_configManager; + private MRHistoryJobConfig m_configManager; - public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) { + public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) { this.m_filter = filter; this.m_configManager = configManager; } @Override public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception { - final JHFConfigManager.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig(); + final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig(); @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { put("site", jobExtractorConfig.site); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index d3e1816..52bd8ea 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -19,7 +19,7 @@ package org.apache.eagle.jpm.mr.history.crawler; import org.apache.commons.lang3.tuple.Pair; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.util.JobIdFilter; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM; import org.slf4j.Logger; @@ -61,8 +61,8 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private int m_partitionId; private TimeZone m_timeZone; - public JHFCrawlerDriverImpl(JHFConfigManager.JobHistoryEndpointConfig jobHistoryConfig, - JHFConfigManager.ControlConfig controlConfig, JHFInputStreamCallback reader, + public JHFCrawlerDriverImpl(MRHistoryJobConfig.JobHistoryEndpointConfig jobHistoryConfig, + MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader, JobHistoryZKStateLCM zkStateLCM, JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { this.m_zeroBasedMonth = controlConfig.zeroBasedMonth; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java index 3b303fd..cfd5994 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java @@ -33,7 +33,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import org.apache.eagle.jpm.util.HDFSUtil; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.JobHistoryEndpointConfig; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDAOImpl.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java index bdaedd4..892c2ea 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java @@ -22,16 +22,18 @@ package org.apache.eagle.jpm.mr.history.parser; import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; /** - * generalizing this listener would decouple entity creation and entity handling, also will help unit testing - * @author yonzhang + * generalizing this listener would decouple entity creation and entity handling, also will help unit testing. * + * @author yonzhang */ public interface HistoryJobEntityCreationListener { /** * job entity created event + * * @param entity */ void jobEntityCreated(JobBaseAPIEntity entity) throws Exception; + /** * for streaming processing, flush would help commit the last several entities */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java index ae6b5c9..a803c6d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java @@ -22,13 +22,12 @@ import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; public interface HistoryJobEntityLifecycleListener extends HistoryJobEntityCreationListener { /** - * job entity created event - * @param entity + * job entity created event. */ void jobEntityCreated(JobBaseAPIEntity entity) throws Exception; /** - * Job finished + * Job finished. */ void jobFinish(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java index d454c31..652eaf8 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java @@ -18,8 +18,6 @@ package org.apache.eagle.jpm.mr.history.parser; -/** - */ public class ImportException extends RuntimeException { private static final long serialVersionUID = -706778307046285820L; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 9992690..82e305a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,14 +18,13 @@ package org.apache.eagle.jpm.mr.history.parser; -import org.apache.eagle.jpm.mr.historyentity.JobConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.mr.historyentity.*; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.JobNameNormalization; import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.jpm.util.jobcounter.JobCounters; -import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.slf4j.Logger; @@ -75,10 +74,18 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl private long sumReduceTaskDuration; public Constants.JobType fetchJobType(Configuration config) { - if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; } - if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { return Constants.JobType.HIVE; } - if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { return Constants.JobType.PIG; } - if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {return Constants.JobType.SCOOBI; } + if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { + return Constants.JobType.CASCADING; + } + if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { + return Constants.JobType.HIVE; + } + if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { + return Constants.JobType.PIG; + } + if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) { + return Constants.JobType.SCOOBI; + } return Constants.JobType.NOTAVALIABLE; } @@ -86,6 +93,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl * baseTags stores the basic tag name values which might be used for persisting various entities * baseTags includes: cluster, datacenter and jobName * baseTags are used for all job/task related entities + * * @param baseTags */ public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { @@ -120,7 +128,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl this.sumReduceTaskDuration = 0l; } - public void register(HistoryJobEntityLifecycleListener lifecycleListener){ + public void register(HistoryJobEntityLifecycleListener lifecycleListener) { this.jobEntityLifecycleListeners.add(lifecycleListener); } @@ -132,7 +140,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } try { flush(); - } catch(Exception ex) { + } catch (Exception ex) { throw new IOException(ex); } } @@ -146,8 +154,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } /** - * @param id - */ + * @param id + */ private void setJobID(String id) { this.m_jobId = id; } @@ -157,128 +165,128 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception { - String id = values.get(Keys.JOBID); - - if (m_jobId == null) { - setJobID(id); - } else if (!m_jobId.equals(id)) { - String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'"; - LOG.error(msg); - throw new ImportException(msg); - } - - if (values.get(Keys.SUBMIT_TIME) != null) { // job submitted - m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME))); - m_user = values.get(Keys.USER); - m_queueName = values.get(Keys.JOB_QUEUE); - m_jobName = values.get(Keys.JOBNAME); - - // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate. - String jobDefId = null; - if(configuration != null ) { - jobDefId = configuration.get(m_filter.getJobNameKey()); - } - - if(jobDefId == null) { - m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName); - } else { - LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId); - m_jobDefId = jobDefId; - } - - LOG.info("JobDefId of " + id + ": " + m_jobDefId); - - m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); - entityCreated(m_jobSubmitEventEntity); - } else if(values.get(Keys.LAUNCH_TIME) != null) { // job launched - m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME))); - m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp(); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); - m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS)); - m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES)); - entityCreated(m_jobLaunchEventEntity); - } else if(values.get(Keys.FINISH_TIME) != null) { // job finished - m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME))); - m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); - entityCreated(m_jobFinishEventEntity); - - // populate jobExecutionEntity entity - m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName); - m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType); - - m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); - m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp()); - m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp()); - m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime()); - m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); - m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp()); - if (values.get(Keys.FAILED_MAPS) != null) { - // for Artemis - m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS))); - } - if (values.get(Keys.FAILED_REDUCES) != null) { - // for Artemis - m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES))); - } - m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS))); - m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES))); - m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps); - m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces); - if (values.get(Keys.COUNTERS) != null || totalCounters != null) { - JobCounters jobCounters = parseCounters(totalCounters); - m_jobExecutionEntity.setJobCounters(jobCounters); - if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) { - Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER); - if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) { - m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue()); - } - - if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) { - m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue()); - } - - if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) { - m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue()); - } - } - - if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) { - m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); - m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); - } - } - m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps); - if (m_numTotalReduces == 0) { - m_jobExecutionEntity.setMaxReduceTaskDuration(0); - m_jobExecutionEntity.setAvgReduceTaskDuration(0); - } else { - m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces); - } - entityCreated(m_jobExecutionEntity); - } + String id = values.get(Keys.JOBID); + + if (m_jobId == null) { + setJobID(id); + } else if (!m_jobId.equals(id)) { + String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'"; + LOG.error(msg); + throw new ImportException(msg); + } + + if (values.get(Keys.SUBMIT_TIME) != null) { // job submitted + m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME))); + m_user = values.get(Keys.USER); + m_queueName = values.get(Keys.JOB_QUEUE); + m_jobName = values.get(Keys.JOBNAME); + + // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate. + String jobDefId = null; + if (configuration != null) { + jobDefId = configuration.get(m_filter.getJobNameKey()); + } + + if (jobDefId == null) { + m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName); + } else { + LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId); + m_jobDefId = jobDefId; + } + + LOG.info("JobDefId of " + id + ": " + m_jobDefId); + + m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name()); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); + entityCreated(m_jobSubmitEventEntity); + } else if (values.get(Keys.LAUNCH_TIME) != null) { // job launched + m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME))); + m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp(); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name()); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); + m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS)); + m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES)); + entityCreated(m_jobLaunchEventEntity); + } else if (values.get(Keys.FINISH_TIME) != null) { // job finished + m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME))); + m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS)); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); + entityCreated(m_jobFinishEventEntity); + + // populate jobExecutionEntity entity + m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName); + m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType); + + m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); + m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp()); + m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp()); + m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime()); + m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp()); + m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp()); + if (values.get(Keys.FAILED_MAPS) != null) { + // for Artemis + m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS))); + } + if (values.get(Keys.FAILED_REDUCES) != null) { + // for Artemis + m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES))); + } + m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS))); + m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES))); + m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps); + m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces); + if (values.get(Keys.COUNTERS) != null || totalCounters != null) { + JobCounters jobCounters = parseCounters(totalCounters); + m_jobExecutionEntity.setJobCounters(jobCounters); + if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) { + Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER); + if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) { + m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue()); + } + + if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) { + m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue()); + } + + if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) { + m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue()); + } + } + + if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) { + m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); + m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps()); + } + } + m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps); + if (m_numTotalReduces == 0) { + m_jobExecutionEntity.setMaxReduceTaskDuration(0); + m_jobExecutionEntity.setAvgReduceTaskDuration(0); + } else { + m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces); + } + entityCreated(m_jobExecutionEntity); + } } private void entityCreated(JobBaseAPIEntity entity) throws Exception { - for (HistoryJobEntityLifecycleListener lifecycleListener: this.jobEntityLifecycleListeners) { + for (HistoryJobEntityLifecycleListener lifecycleListener : this.jobEntityLifecycleListeners) { lifecycleListener.jobEntityCreated(entity); } @@ -295,11 +303,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl protected abstract JobCounters parseCounters(Object value) throws IOException; /** - * for one task ID, it has several sequential task events, i.e. - * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end - * @param values - * @throws IOException - */ + * for one task ID, it has several sequential task events, i.e. + * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end + * + * @param values + * @throws IOException + */ @SuppressWarnings("serial") protected void handleTask(RecordTypes recType, EventType eventType, final Map<Keys, String> values, Object counters) throws Exception { String taskAttemptID = values.get(Keys.TASK_ATTEMPT_ID); @@ -308,7 +317,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl final String taskType = values.get(Keys.TASK_TYPE); final String taskID = values.get(Keys.TASKID); - Map<String, String> taskBaseTags = new HashMap<String, String>(){{ + Map<String, String> taskBaseTags = new HashMap<String, String>() {{ put(MRJobTagName.TASK_TYPE.toString(), taskType); put(MRJobTagName.USER.toString(), m_user); //put(MRJobTagName.JOB_NAME.toString(), _jobName); @@ -402,11 +411,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl Map<String, String> prop = new TreeMap<>(); if (m_filter.acceptJobConfFile()) { - Iterator<Map.Entry<String, String> > iter = configuration.iterator(); + Iterator<Map.Entry<String, String>> iter = configuration.iterator(); while (iter.hasNext()) { String key = iter.next().getKey(); - if (included(key) && !excluded(key)) + if (included(key) && !excluded(key)) { prop.put(key, configuration.get(key)); + } } } @@ -442,15 +452,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl break; } } - if (!matched) + if (!matched) { return false; + } } return true; } private boolean included(String key) { - if (m_filter.getJobConfKeyInclusionPatterns() == null) + if (m_filter.getJobConfKeyInclusionPatterns() == null) { return true; + } for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) { Matcher m = p.matcher(key); if (m.matches()) { @@ -462,13 +474,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } private boolean excluded(String key) { - if (m_filter.getJobConfKeyExclusionPatterns() == null) + if (m_filter.getJobConfKeyExclusionPatterns() == null) { return false; + } for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) { Matcher m = p.matcher(key); - if (m.matches()) + if (m.matches()) { return true; + } } return false; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java index 654f63f..6932dad 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java @@ -19,9 +19,9 @@ package org.apache.eagle.jpm.mr.history.parser; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.jobcounter.JobCounters; -import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; @@ -37,8 +37,8 @@ import java.util.Map; /** * Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading. - * @author yonzhang * + * @author yonzhang */ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener { private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class); @@ -47,6 +47,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer * baseTags stores the basic tag name values which might be used for persisting various entities * baseTags includes: cluster, datacenter and jobName * baseTags are used for all job/task related entities + * * @param baseTags */ public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) { @@ -55,7 +56,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer @Override public void handle(RecordTypes recType, Map<Keys, String> values) - throws Exception { + throws Exception { switch (recType) { case Job: handleJob(null, values, values.get(Keys.COUNTERS)); @@ -76,11 +77,12 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer ; } } - + private void ensureRackHostnameAfterAttemptFinish(Map<Keys, String> values) { // only care about attempt finish - if (values.get(Keys.FINISH_TIME) == null) + if (values.get(Keys.FINISH_TIME) == null) { return; + } String hostname = null; String rack = null; // we get rack/hostname based on task's status @@ -92,24 +94,24 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer hostname = tmp[tmp.length - 1]; rack = tmp[tmp.length - 2]; m_host2RackMapping.put(hostname, rack); - } else if(values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) { + } else if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) { hostname = values.get(Keys.HOSTNAME); // make every effort to get RACK information hostname = (hostname == null) ? "" : hostname; rack = m_host2RackMapping.get(hostname); } - + values.put(Keys.HOSTNAME, hostname); values.put(Keys.RACK, rack); } - + @Override protected JobCounters parseCounters(Object value) throws IOException { JobCounters jc = new JobCounters(); Map<String, Map<String, Long>> groups = new HashMap<String, Map<String, Long>>(); - Counters counters = new Counters(); + Counters counters = new Counters(); try { - CountersStrings.parseEscapedCompactString((String)value, counters); + CountersStrings.parseEscapedCompactString((String) value, counters); } catch (Exception ex) { logger.error("can not parse job history", ex); throw new IOException(ex); @@ -118,7 +120,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer while (it.hasNext()) { CounterGroup cg = it.next(); - // hardcoded to exclude business level counters + // hardcoded to exclude business level counters if (!cg.getName().equals("org.apache.hadoop.mapreduce.FileSystemCounter") && !cg.getName().equals("org.apache.hadoop.mapreduce.TaskCounter") && !cg.getName().equals("org.apache.hadoop.mapreduce.JobCounter") @@ -128,7 +130,9 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer && !cg.getName().equals("org.apache.hadoop.mapred.Task$Counter") // for artemis && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") - ) continue; + ) { + continue; + } groups.put(cg.getName(), new HashMap<String, Long>()); Map<String, Long> counterValues = groups.get(cg.getName()); @@ -143,7 +147,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer jc.setCounters(groups); return jc; } - + public JobExecutionAPIEntity jobExecution() { return m_jobExecutionEntity; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java index bb08ef0..ab59a41 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java @@ -44,16 +44,18 @@ public class JHFMRVer1Parser implements JHFParserBase { static final String MAX_COUNTER_COUNT = "10000"; private JHFMRVer1EventReader m_reader; - public JHFMRVer1Parser(JHFMRVer1EventReader reader){ + + public JHFMRVer1Parser(JHFMRVer1EventReader reader) { this.m_reader = reader; } /** - * Parses history file and invokes Listener.handle() for - * each line of history. It can be used for looking through history - * files for specific items without having to keep whole history in memory. - * @throws IOException - */ + * Parses history file and invokes Listener.handle() for + * each line of history. It can be used for looking through history + * files for specific items without having to keep whole history in memory. + * + * @throws IOException + */ @Override public void parse(InputStream in) throws Exception, ParseException { // set enough counter number as user may build more counters @@ -68,7 +70,7 @@ public class JHFMRVer1Parser implements JHFParserBase { // Check if the file is empty if (line == null) { - return; + return; } // Get the information required for further processing @@ -80,17 +82,17 @@ public class JHFMRVer1Parser implements JHFParserBase { do { buf.append(line); if (!line.trim().endsWith(lineDelim) || line.trim().endsWith(escapedLineDelim)) { - buf.append("\n"); - continue; + buf.append("\n"); + continue; } parseLine(buf.toString(), m_reader, isEscaped); buf = new StringBuffer(); - } while ((line = reader.readLine())!= null); + } while ((line = reader.readLine()) != null); // flush to tell listener that we have finished parsing logger.info("finish parsing job history file and close"); m_reader.close(); - } catch(Exception ex) { + } catch (Exception ex) { logger.error("can not parse correctly ", ex); throw ex; } finally { @@ -104,17 +106,17 @@ public class JHFMRVer1Parser implements JHFParserBase { // extract the record type int idx = line.indexOf(' '); String recType = line.substring(0, idx); - String data = line.substring(idx+1, line.length()); + String data = line.substring(idx + 1, line.length()); Matcher matcher = pattern.matcher(data); - Map<Keys,String> parseBuffer = new HashMap<Keys, String>(); + Map<Keys, String> parseBuffer = new HashMap<Keys, String>(); - while(matcher.find()) { + while (matcher.find()) { String tuple = matcher.group(0); - String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '='); - String value = parts[1].substring(1, parts[1].length() -1); + String[] parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '='); + String value = parts[1].substring(1, parts[1].length() - 1); if (isEscaped) { - value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape); + value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape); } parseBuffer.put(Keys.valueOf(parts[0]), value); } @@ -131,141 +133,154 @@ public class JHFMRVer1Parser implements JHFParserBase { } parseBuffer.clear(); - } - - /** - * Manages job-history's meta information such as version etc. - * Helps in logging version information to the job-history and recover - * version information from the history. - */ - static class MetaInfoManager implements JHFMRVer1PerLineListener { - private long version = 0L; - private KeyValuePair pairs = new KeyValuePair(); - - public void close() { - } - // Extract the version of the history that was used to write the history - public MetaInfoManager(String line) throws Exception, ParseException { - if (null != line) { - // Parse the line - parseLine(line, this, false); - } - } - - // Get the line delimiter - char getLineDelim() { - if (version == 0) { - return '"'; - } else { - return LINE_DELIMITER_CHAR; - } - } - - // Checks if the values are escaped or not - boolean isValueEscaped() { - // Note that the values are not escaped in version 0 - return version != 0; - } - - public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException { + } + + /** + * Manages job-history's meta information such as version etc. + * Helps in logging version information to the job-history and recover + * version information from the history. + */ + static class MetaInfoManager implements JHFMRVer1PerLineListener { + private long version = 0L; + private KeyValuePair pairs = new KeyValuePair(); + + public void close() { + } + + // Extract the version of the history that was used to write the history + public MetaInfoManager(String line) throws Exception, ParseException { + if (null != line) { + // Parse the line + parseLine(line, this, false); + } + } + + // Get the line delimiter + char getLineDelim() { + if (version == 0) { + return '"'; + } else { + return LINE_DELIMITER_CHAR; + } + } + + // Checks if the values are escaped or not + boolean isValueEscaped() { + // Note that the values are not escaped in version 0 + return version != 0; + } + + public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException { // Check if the record is of type META - if (RecordTypes.Meta == recType) { - pairs.handle(values); - version = pairs.getLong(Keys.VERSION); // defaults to 0 - } - } - } - - /** - * Base class contais utility stuff to manage types key value pairs with enums. - */ - static class KeyValuePair { - private Map<Keys, String> values = new HashMap<Keys, String>(); - - /** - * Get 'String' value for given key. Most of the places use Strings as - * values so the default get' method returns 'String'. This method never returns - * null to ease on GUIs. if no value is found it returns empty string "" - * @param k - * @return if null it returns empty string - "" - */ - public String get(Keys k) { - String s = values.get(k); - return s == null ? "" : s; - } - /** - * Convert value from history to int and return. - * if no value is found it returns 0. - * @param k key - */ - public int getInt(Keys k) { - String s = values.get(k); - if (null != s){ - return Integer.parseInt(s); - } - return 0; - } - /** - * Convert value from history to int and return. - * if no value is found it returns 0. - * @param k - */ - public long getLong(Keys k) { - String s = values.get(k); - if (null != s){ - return Long.parseLong(s); - } - return 0; - } - /** - * Set value for the key. - * @param k - * @param s - */ - public void set(Keys k, String s) { - values.put(k, s); - } - /** - * Adds all values in the Map argument to its own values. - * @param m - */ - public void set(Map<Keys, String> m) { - values.putAll(m); - } - /** - * Reads values back from the history, input is same Map as passed to Listener by parseHistory(). - * @param values - */ - public synchronized void handle(Map<Keys, String> values) { - set(values); - } - /** - * Returns Map containing all key-values. - */ - public Map<Keys, String> getValues() { - return values; - } - } - - /** - * Job history files contain key="value" pairs, where keys belong to this enum. - * It acts as a global namespace for all keys. - */ - public static enum Keys { - JOBTRACKERID, - START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, - LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, - FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, - ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, - SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, - TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS, - VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK, - - UBERISED,SPLIT_LOCATIONS,FAILED_DUE_TO_ATTEMPT,MAP_FINISH_TIME,PORT,RACK_NAME, - - //For Artemis - WORKFLOW_ID,WORKFLOW_NAME,WORKFLOW_NODE_NAME,WORKFLOW_ADJACENCIES,WORKFLOW_TAGS, - SHUFFLE_PORT,LOCALITY,AVATAAR,FAIL_REASON - } + if (RecordTypes.Meta == recType) { + pairs.handle(values); + version = pairs.getLong(Keys.VERSION); // defaults to 0 + } + } + } + + /** + * Base class contais utility stuff to manage types key value pairs with enums. + */ + static class KeyValuePair { + private Map<Keys, String> values = new HashMap<Keys, String>(); + + /** + * Get 'String' value for given key. Most of the places use Strings as + * values so the default get' method returns 'String'. This method never returns + * null to ease on GUIs. if no value is found it returns empty string "" + * + * @param k + * @return if null it returns empty string - "" + */ + public String get(Keys k) { + String s = values.get(k); + return s == null ? "" : s; + } + + /** + * Convert value from history to int and return. + * if no value is found it returns 0. + * + * @param k key + */ + public int getInt(Keys k) { + String s = values.get(k); + if (null != s) { + return Integer.parseInt(s); + } + return 0; + } + + /** + * Convert value from history to int and return. + * if no value is found it returns 0. + * + * @param k + */ + public long getLong(Keys k) { + String s = values.get(k); + if (null != s) { + return Long.parseLong(s); + } + return 0; + } + + /** + * Set value for the key. + * + * @param k + * @param s + */ + public void set(Keys k, String s) { + values.put(k, s); + } + + /** + * Adds all values in the Map argument to its own values. + * + * @param m + */ + public void set(Map<Keys, String> m) { + values.putAll(m); + } + + /** + * Reads values back from the history, input is same Map as passed to Listener by parseHistory(). + * + * @param values + */ + public synchronized void handle(Map<Keys, String> values) { + set(values); + } + + /** + * Returns Map containing all key-values. + */ + public Map<Keys, String> getValues() { + return values; + } + } + + /** + * Job history files contain key="value" pairs, where keys belong to this enum. + * It acts as a global namespace for all keys. + */ + public static enum Keys { + JOBTRACKERID, + START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, + LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, + FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, + ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, + SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, + TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS, + VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK, + + UBERISED, SPLIT_LOCATIONS, FAILED_DUE_TO_ATTEMPT, MAP_FINISH_TIME, PORT, RACK_NAME, + + //For Artemis + WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES, WORKFLOW_TAGS, + SHUFFLE_PORT, LOCALITY, AVATAAR, FAIL_REASON + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java index 1c096fc..5d48d5d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java @@ -24,15 +24,15 @@ import java.io.IOException; import java.util.Map; /** - * Callback interface for reading back log events from JobHistory. This interface - * should be implemented and passed to JobHistory.parseHistory() - * + * Callback interface for reading back log events from JobHistory. This interface + * should be implemented and passed to JobHistory.parseHistory() */ -public interface JHFMRVer1PerLineListener{ +public interface JHFMRVer1PerLineListener { /** - * Callback method for history parser. - * @param recType type of record, which is the first entry in the line. - * @param values a map of key-value pairs as thry appear in history. + * Callback method for history parser. + * + * @param recType type of record, which is the first entry in the line. + * @param values a map of key-value pairs as thry appear in history. * @throws IOException */ void handle(RecordTypes recType, Map<Keys, String> values) throws Exception;