Repository: incubator-eagle Updated Branches: refs/heads/master 0474d5916 -> 21187b55c (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 3daae37..ca4a94f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -18,19 +18,20 @@ package org.apache.eagle.jpm.mr.history.storm; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.*; -import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; +import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity; import org.apache.eagle.jpm.util.JobIdFilter; import org.apache.eagle.jpm.util.JobIdFilterByPartition; import org.apache.eagle.jpm.util.JobIdPartitioner; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,28 +44,30 @@ import java.util.Map; /** * Zookeeper znode structure * -zkRoot - * - partitions - * - 0 (20150101) - * - 1 (20150101) - * - 2 (20150101) - * - ... ... - * - N-1 (20150102) - * - jobs - * - 20150101 - * - job1 - * - job2 - * - job3 - * - 20150102 - * - job1 - * - job2 - * - job3 - * + * - partitions + * - 0 (20150101) + * - 1 (20150101) + * - 2 (20150101) + * - ... ... + * - N-1 (20150102) + * - jobs + * - 20150101 + * - job1 + * - job2 + * - job3 + * - 20150102 + * - job1 + * - job2 + * - job3 + * <p> * Spout can have multiple instances, which is supported by storm parallelism primitive. - * + * </p> + * <p> * Under znode partitions, N child znodes (name is 0 based integer) would be created with each znode mapped to one spout instance. All jobs will be partitioned into N * partitions by applying JobPartitioner class to each job Id. The value of each partition znode is the date when the last job in this partition * is successfully processed. - * + * </p> + * <p> * processing steps * 1) In constructor, * 2) In open(), calculate jobPartitionId for current spout (which should be exactly same to spout taskId within TopologyContext) @@ -74,10 +77,9 @@ import java.util.Map; * 7) process job files (job history file and job configuration xml file) * 8) add job Id to current date slot say for example 20150102 after this job is successfully processed * 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should be configurable) - * + * </p> * Note: * if one spout instance crashes and is brought up again, open() method would be invoked again, we need think of this scenario. - * */ public class JobHistorySpout extends BaseRichSpout { @@ -90,20 +92,18 @@ public class JobHistorySpout extends BaseRichSpout { private JobHistoryContentFilter contentFilter; private JobHistorySpoutCollectorInterceptor interceptor; private JHFInputStreamCallback callback; - private JHFConfigManager configManager; - private JobHistoryLCM m_jhfLCM; - private final static int MAX_RETRY_TIMES = 3; + private MRHistoryJobConfig configManager; + private JobHistoryLCM jhfLCM; + private static final int MAX_RETRY_TIMES = 3; - public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) { + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager) { this(filter, configManager, new JobHistorySpoutCollectorInterceptor()); } /** - * mostly this constructor signature is for unit test purpose as you can put customized interceptor here - * @param filter - * @param adaptor + * mostly this constructor signature is for unit test purpose as you can put customized interceptor here. */ - public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager, JobHistorySpoutCollectorInterceptor adaptor) { + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, JobHistorySpoutCollectorInterceptor adaptor) { this.contentFilter = filter; this.configManager = configManager; this.interceptor = adaptor; @@ -131,15 +131,15 @@ public class JobHistorySpout extends BaseRichSpout { partitionId = calculatePartitionId(context); // sanity verify 0<=partitionId<=numTotalPartitions-1 if (partitionId < 0 || partitionId > numTotalPartitions) { - throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + - partitionId + " and numTotalPartitions " + numTotalPartitions); + throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + + partitionId + " and numTotalPartitions " + numTotalPartitions); } Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls; JobIdPartitioner partitioner; try { partitioner = partitionerCls.newInstance(); } catch (Exception e) { - LOG.error("failing instantiating job partitioner class " + partitionerCls,e); + LOG.error("failing instantiating job partitioner class " + partitionerCls, e); throw new IllegalStateException(e); } JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId); @@ -148,14 +148,14 @@ public class JobHistorySpout extends BaseRichSpout { interceptor.setSpoutOutputCollector(collector); try { - m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig()); + jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig()); driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(), - configManager.getControlConfig(), - callback, - zkState, - m_jhfLCM, - jobIdFilter, - partitionId); + configManager.getControlConfig(), + callback, + zkState, + jhfLCM, + jobIdFilter, + partitionId); } catch (Exception e) { LOG.error("failing creating crawler driver"); throw new IllegalStateException(e); @@ -171,7 +171,7 @@ public class JobHistorySpout extends BaseRichSpout { } catch (Exception ex) { LOG.error("fail crawling job history file and continue ...", ex); try { - m_jhfLCM.freshFileSystem(); + jhfLCM.freshFileSystem(); } catch (Exception e) { LOG.error("failed to fresh file system ", e); } @@ -179,27 +179,27 @@ public class JobHistorySpout extends BaseRichSpout { try { Thread.sleep(1000); } catch (Exception e) { - + // ignored } } } /** - * empty because framework will take care of output fields declaration + * empty because framework will take care of output fields declaration. */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } /** - * add to processedJob + * add to processedJob. */ @Override public void ack(Object jobId) { } /** - * job is not fully processed + * job is not fully processed. */ @Override public void fail(Object jobId) { @@ -227,26 +227,28 @@ public class JobHistorySpout extends BaseRichSpout { } } - if (minTimeStamp == 0l) { + if (minTimeStamp == 0L) { return; } LOG.info("update process time stamp {}", minTimeStamp); - final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); - Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", jobExtractorConfig.site); - } }; + final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + Map<String, String> baseTags = new HashMap<String, String>() { + { + put("site", jobExtractorConfig.site); + } + }; JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity(); entity.setCurrentTimeStamp(minTimeStamp); entity.setTimestamp(minTimeStamp); entity.setTags(baseTags); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); @@ -267,7 +269,7 @@ public class JobHistorySpout extends BaseRichSpout { LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex); } } - tried ++; + tried++; } client.getJerseyClient().destroy(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java index 933b347..cbde88c 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java @@ -22,12 +22,20 @@ import java.util.List; public interface JobHistoryZKStateLCM { void ensureJobPartitions(int numTotalPartitions); + String readProcessedDate(int partitionId); + List<String> readProcessedJobs(String date); + void updateProcessedDate(int partitionId, String date); + void addProcessedJob(String date, String jobId); + void truncateProcessedJob(String date); + void truncateEverything(); + long readProcessedTimeStamp(int partitionId); + void updateProcessedTimeStamp(int partitionId, long timeStamp); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java index 33d3cb2..feb896e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java @@ -18,11 +18,12 @@ package org.apache.eagle.jpm.mr.history.zkres; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +46,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { private CuratorFramework newCurator(ZKStateConfig config) throws Exception { return CuratorFrameworkFactory.newClient( - config.zkQuorum, - config.zkSessionTimeoutMs, - 15000, - new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval) + config.zkQuorum, + config.zkSessionTimeoutMs, + 15000, + new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval) ); } @@ -86,7 +87,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { if (_curator.checkExists().forPath(path) != null) { _curator.delete().forPath(path); } - } catch(Exception ex) { + } catch (Exception ex) { LOG.error("fail reading forceStartFrom znode", ex); } } @@ -102,27 +103,28 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { /** * under zkRoot, znode forceStartFrom is used to force job is crawled from that date * IF - * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD", + * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD", * THEN - * rebuild all partitions with the forceStartFrom + * rebuild all partitions with the forceStartFrom * ELSE - * IF - * partition structure is changed - * THEN - * IF - * there is valid mindate for existing partitions - * THEN - * rebuild job partitions from that valid mindate - * ELSE - * rebuild job partitions from (today - BACKOFF_DAYS) - * END - * ELSE - * do nothing - * END + * IF + * partition structure is changed + * THEN + * IF + * there is valid mindate for existing partitions + * THEN + * rebuild job partitions from that valid mindate + * ELSE + * rebuild job partitions from (today - BACKOFF_DAYS) + * END + * ELSE + * do nothing * END - * - * + * END + * <p> * forceStartFrom is deleted once its value is used, so next time when topology is restarted, program can run from where topology is stopped last time + * </p> + * . */ @Override public void ensureJobPartitions(int numTotalPartitions) { @@ -137,7 +139,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { if (forceStartFrom != null) { try { minDate = Integer.valueOf(forceStartFrom); - } catch(Exception ex) { + } catch (Exception ex) { LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom); throw new IllegalStateException(); } @@ -153,16 +155,18 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions); } } - if (!structureChanged) + if (!structureChanged) { return; // do nothing + } if (pathExists) { List<String> partitions = _curator.getChildren().forPath(path); for (String partition : partitions) { String date = new String(_curator.getData().forPath(path + "/" + partition), "UTF-8"); int tmp = Integer.valueOf(date); - if(tmp < minDate) + if (tmp < minDate) { minDate = tmp; + } } } @@ -178,7 +182,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { } finally { try { lock.release(); - } catch(Exception e) { + } catch (Exception e) { LOG.error("fail releasing lock", e); throw new RuntimeException(e); } @@ -195,9 +199,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { for (int i = 0; i < numTotalPartitions; i++) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path + "/" + i, startingDate.getBytes("UTF-8")); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path + "/" + i, startingDate.getBytes("UTF-8")); } } @@ -222,9 +226,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, date.getBytes("UTF-8")); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, date.getBytes("UTF-8")); } else { _curator.setData().forPath(path, date.getBytes("UTF-8")); } @@ -240,9 +244,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); } else { _curator.setData().forPath(path); } @@ -311,10 +315,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path); - return 0l; + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + return 0L; } else { return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8")); } @@ -330,9 +334,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); } _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8")); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml new file mode 100644 index 0000000..5e69a16 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -0,0 +1,154 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<application> + <type>SPARK_HISTORY_JOB_APP</type> + <name>Spark History Job Monitoring</name> + <version>0.5.0-incubating</version> + <appClass>org.apache.eagle.jpm.mr.history.MRHistoryJobApplication</appClass> + <viewPath>/apps/jpm</viewPath> + <configuration> + <!-- org.apache.eagle.jpm.mr.history.MRHistoryJobConfig --> + <property> + <name>jobExtractorConfig.site</name> + <displayName>Site ID</displayName> + <value>sandbox</value> + </property> + <property> + <name>jobExtractorConfig.mrVersion</name> + <value>MRVer2</value> + </property> + <property> + <name>jobExtractorConfig.readTimeOutSeconds</name> + <displayName>zkPort</displayName> + <value>10</value> + </property> + <property> + <name>dataSourceConfig.zkQuorum</name> + <value>sandbox.hortonworks.com:2181</value> + </property> + <property> + <name>dataSourceConfig.zkPort</name> + <value>2181</value> + </property> + <property> + <name>dataSourceConfig.zkSessionTimeoutMs</name> + <value>15000</value> + </property> + <property> + <name>dataSourceConfig.zkRetryTimes</name> + <value>3</value> + </property> + <property> + <name>dataSourceConfig.zkRetryInterval</name> + <value>20000</value> + </property> + <property> + <name>dataSourceConfig.zkRoot</name> + <value>/test_mrjobhistory</value> + </property> + <property> + <name>dataSourceConfig.basePath</name> + <value>/mr-history/done</value> + </property> + <property> + <name>dataSourceConfig.jobTrackerName</name> + <value></value> + </property> + <property> + <name>dataSourceConfig.nnEndpoint</name> + <value>hdfs://sandbox.hortonworks.com:8020</value> + </property> + <property> + <name>dataSourceConfig.pathContainsJobTrackerName</name> + <value>false</value> + </property> + <property> + <name>dataSourceConfig.principal</name> + <value></value> + </property> + <property> + <name>dataSourceConfig.keytab</name> + <value></value> + </property> + <property> + <name>dataSourceConfig.dryRun</name> + <value>false</value> + </property> + <property> + <name>dataSourceConfig.partitionerCls</name> + <value>org.apache.eagle.jpm.util.DefaultJobIdPartitioner</value> + </property> + <property> + <name>dataSourceConfig.zeroBasedMonth</name> + <value>false</value> + </property> + <property> + <name>MRConfigureKeys.jobConfigKey</name> + <value>mapreduce.map.output.compress, + mapreduce.map.output.compress.codec, + mapreduce.output.fileoutputformat.compress, + mapreduce.output.fileoutputformat.compress.type, + mapreduce.output.fileoutputformat.compress.codec, + mapred.output.format.class, + dataplatform.etl.info, + mapreduce.map.memory.mb, + mapreduce.reduce.memory.mb, + mapreduce.map.java.opts, + mapreduce.reduce.java.opts</value> + </property> + <property> + <name>MRConfigureKeys.jobNameKey</name> + <value>eagle.job.name</value> + </property> + <property> + <name>envContextConfig.parallelismConfig.mrHistoryJobExecutor</name> + <value>6</value> + </property> + <property> + <name>envContextConfig.tasks.mrHistoryJobExecutor</name> + <value>6</value> + </property> + <property> + <name>eagleProps.eagleService.host</name> + <description>eagleProps.eagleService.host</description> + <value>sandbox.hortonworks.com</value> + </property> + <property> + <name>eagleProps.eagleService.port</name> + <description>eagleProps.eagleService.port</description> + <value>9099</value> + </property> + <property> + <name>eagleProps.eagleService.username</name> + <description>eagleProps.eagleService.username</description> + <value>admin</value> + </property> + <property> + <name>eagleProps.eagleService.password</name> + <description>eagleProps.eagleService.password</description> + <value>secret</value> + </property> + </configuration> + <docs> + <install> + </install> + <uninstall> + </uninstall> + </docs> +</application> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..56a30bd --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index 23a51fc..13e411f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -15,9 +15,6 @@ { "envContextConfig" : { - "env" : "local", - "topologyName" : "mr_history", - "stormConfigFile" : "storm.yaml", "parallelismConfig" : { "mrHistoryJobExecutor" : 6 }, @@ -62,21 +59,10 @@ "password": "secret" } }, - + "appId":"mr_history", + "mode":"LOCAL", "MRConfigureKeys" : { "jobNameKey" : "eagle.job.name", - "jobConfigKey" : [ - "mapreduce.map.output.compress", - "mapreduce.map.output.compress.codec", - "mapreduce.output.fileoutputformat.compress", - "mapreduce.output.fileoutputformat.compress.type", - "mapreduce.output.fileoutputformat.compress.codec", - "mapred.output.format.class", - "dataplatform.etl.info", - "mapreduce.map.memory.mb", - "mapreduce.reduce.memory.mb", - "mapreduce.map.java.opts", - "mapreduce.reduce.java.opts" - ] + "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java new file mode 100644 index 0000000..0a3a3a1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history; + +import com.google.inject.Inject; +import org.apache.eagle.app.test.AppJUnitRunner; +import org.apache.eagle.app.test.ApplicationSimulator; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(AppJUnitRunner.class) +public class MRHistoryJobApplicationProviderTest { + @Inject private ApplicationSimulator simulator; + + @Test + public void testRunAsManagedApplicationWithSimulator(){ + simulator.start(MRHistoryJobApplicationProvider.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java new file mode 100644 index 0000000..318a641 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history; + +import com.typesafe.config.ConfigFactory; +import org.junit.Test; + +public class MRHistoryJobApplicationTest { + @Test + public void testRun(){ + new MRHistoryJobApplication().run(ConfigFactory.load()); + } +}