Repository: eagle Updated Branches: refs/heads/master 4ed406abc -> 077e27b23
[EAGLE-889] Add a restful API for Hadoop running queue to query top N users or jobs https://issues.apache.org/jira/browse/EAGLE-889 Sample 1: query the top N users/jobs of memory usage under queue1 on site1 at time 1487156721356 http://localhost:9090/rest/queue/memory?top=10&queue=queue1¤tTime=1487156721356&site=site1 Sample 2: query the current queue hierarchy http://localhost:9090/rest/entities?query=QueueMappingService[site=%22sandbox%22]{*}&pageSize=10000 Author: Zhao, Qingwen <qingwz...@apache.org> Closes #805 from qingwen220/EAGLE-889. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/077e27b2 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/077e27b2 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/077e27b2 Branch: refs/heads/master Commit: 077e27b236c4b31454a45bb66a7372bfcd01e13a Parents: 4ed406a Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Thu Feb 16 15:41:20 2017 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Thu Feb 16 15:41:20 2017 +0800 ---------------------------------------------------------------------- .../queue/common/HadoopClusterConstants.java | 1 + .../crawler/SchedulerInfoParseListener.java | 25 ++- .../model/HadoopQueueEntityRepository.java | 2 + .../scheduler/QueueStructureAPIEntity.java | 69 +++++++++ .../storm/HadoopQueueMetricPersistBolt.java | 15 +- ...doop.queue.HadoopQueueRunningAppProvider.xml | 2 +- .../jpm/mr/running/parser/MRJobParser.java | 2 + eagle-jpm/eagle-jpm-service/pom.xml | 5 + .../eagle/service/jpm/RunningQueueResource.java | 154 +++++++++++++++++++ .../eagle/service/jpm/RunningQueueResponse.java | 50 ++++++ ...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 2 +- ....eagle.topology.TopologyCheckAppProvider.xml | 2 +- 12 files changed, 316 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 9a08f05..1d64f87 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -80,6 +80,7 @@ public class HadoopClusterConstants { } public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; + public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService"; // tag constants public static final String TAG_PARENT_QUEUE = "parentQueue"; http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index b0452c9..67cc5c9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -22,7 +22,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; import org.apache.eagle.hadoop.queue.model.scheduler.*; +import org.apache.eagle.hadoop.queue.model.scheduler.Queue; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericMetricEntity; import backtype.storm.spout.SpoutOutputCollector; @@ -41,7 +43,7 @@ public class SchedulerInfoParseListener { //private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE; //private int MAX_CACHE_COUNT = 1000; - private final List<RunningQueueAPIEntity> runningQueueAPIEntities = new ArrayList<>(); + private final List<TaggedLogAPIEntity> runningQueueAPIEntities = new ArrayList<>(); private final List<GenericMetricEntity> metricEntities = new ArrayList<>(); private String site; @@ -56,6 +58,7 @@ public class SchedulerInfoParseListener { Map<String, String> tags = buildMetricTags(null, null); createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity()); createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity()); + for (Queue queue : scheduler.getQueues().getQueue()) { createQueues(queue, currentTimestamp, scheduler, null); } @@ -69,7 +72,7 @@ public class SchedulerInfoParseListener { LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size()); messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); - List<RunningQueueAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities); + List<TaggedLogAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities); collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId); runningQueueAPIEntities.clear(); @@ -97,7 +100,7 @@ public class SchedulerInfoParseListener { this.metricEntities.add(e); } - private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception { + private List<String> createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception { RunningQueueAPIEntity _entity = new RunningQueueAPIEntity(); Map<String, String> _tags = buildMetricTags(queue.getQueueName(), parentQueueName); _entity.setTags(_tags); @@ -123,7 +126,6 @@ public class SchedulerInfoParseListener { UserWrappers users = new UserWrappers(); users.setUsers(userList); _entity.setUsers(users); - runningQueueAPIEntities.add(_entity); createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, currentTimestamp, queue.getNumPendingApplications()); @@ -145,11 +147,24 @@ public class SchedulerInfoParseListener { } } + List<String> subQueues = new ArrayList<>(); + List<String> allSubQueues = new ArrayList<>(); if (queue.getQueues() != null && queue.getQueues().getQueue() != null) { for (Queue subQueue : queue.getQueues().getQueue()) { - createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName()); + subQueues.add(subQueue.getQueueName()); + allSubQueues.add(subQueue.getQueueName()); + List<String> queues = createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName()); + allSubQueues.addAll(queues); } } + QueueStructureAPIEntity queueStructureAPIEntity = new QueueStructureAPIEntity(); + queueStructureAPIEntity.setTags(_tags); + queueStructureAPIEntity.setSubQueues(subQueues); + queueStructureAPIEntity.setAllSubQueues(allSubQueues); + queueStructureAPIEntity.setLastUpdateTime(currentTimestamp); + runningQueueAPIEntities.add(queueStructureAPIEntity); + + return allSubQueues; } private UserWrapper wrapUser(User user) { http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java index f598779..40d6e53 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java @@ -17,11 +17,13 @@ */ package org.apache.eagle.hadoop.queue.model; +import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepository; public class HadoopQueueEntityRepository extends EntityRepository { public HadoopQueueEntityRepository() { this.registerEntity(RunningQueueAPIEntity.class); + this.registerEntity(QueueStructureAPIEntity.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java new file mode 100644 index 0000000..72f67bc --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.scheduler; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +import java.util.List; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("queue_map") +@ColumnFamily("f") +@Prefix("queueMap") +@Service(HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME) +@TimeSeries(false) +@Partition( {"site"}) +public class QueueStructureAPIEntity extends TaggedLogAPIEntity { + @Column("a") + private List<String> subQueues; + @Column("b") + private List<String> allSubQueues; + @Column("c") + private long lastUpdateTime; + + public List<String> getSubQueues() { + return subQueues; + } + + public void setSubQueues(List<String> subQueues) { + this.subQueues = subQueues; + valueChanged("subQueues"); + } + + public List<String> getAllSubQueues() { + return allSubQueues; + } + + public void setAllSubQueues(List<String> allSubQueues) { + this.allSubQueues = allSubQueues; + valueChanged("allSubQueues"); + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } + + public void setLastUpdateTime(long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + valueChanged("lastUpdateTime"); + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 1bafc13..9eb7008 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -30,6 +30,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.IEagleServiceClient; @@ -72,10 +73,14 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data; writeMetrics(metrics); } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) { - List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data; - for (RunningQueueAPIEntity queue : entities) { - if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { - collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue))); + List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data; + for (TaggedLogAPIEntity entity : entities) { + if (entity instanceof RunningQueueAPIEntity) { + RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; + if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { + collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), + parseLeafQueueInfo(queue))); + } } } writeEntities(entities); @@ -99,7 +104,7 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { } } - private void writeEntities(List<RunningQueueAPIEntity> entities) { + private void writeEntities(List<TaggedLogAPIEntity> entities) { try { GenericServiceAPIResponseEntity response = client.create(entities); if (!response.isSuccess()) { http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index 4cf745c..5fb041d 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -57,7 +57,7 @@ <property> <name>dataSinkConfig.topic</name> <displayName>dataSinkConfig.topic</displayName> - <value>hadoop_leaf_queue</value> + <value>yarn_queue</value> <description>topic for kafka data sink</description> </property> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 6b33d31..525ffc2 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -135,6 +135,8 @@ public class MRJobParser implements Runnable { JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId); jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString()); jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString()); + // set an estimated job finished time because it's hard the get the specific one + jobExecutionAPIEntity.setEndTime(System.currentTimeMillis()); mrJobConfigs.remove(mrJobId); if (mrJobConfigs.size() == 0) { this.parserStatus = ParserStatus.APP_FINISHED; http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/pom.xml b/eagle-jpm/eagle-jpm-service/pom.xml index d6807bd..197740e 100644 --- a/eagle-jpm/eagle-jpm-service/pom.xml +++ b/eagle-jpm/eagle-jpm-service/pom.xml @@ -43,5 +43,10 @@ <artifactId>eagle-jpm-entity</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-hadoop-queue</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java new file mode 100644 index 0000000..2632423 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.utils.Tuple2; +import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.generic.GenericEntityServiceResource; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.text.ParseException; +import java.util.*; + +import static org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME; +import static org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME; +import static org.apache.eagle.jpm.util.Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME; +import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID; +import static org.apache.eagle.jpm.util.MRJobTagName.JOB_QUEUE; +import static org.apache.eagle.jpm.util.MRJobTagName.USER; + +@Path("queue") +public class RunningQueueResource { + + @GET + @Path("memory") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, + @QueryParam("queue") String queue, + @QueryParam("currentTime") long currentTime, + @QueryParam("top") int top) { + RunningQueueResponse result = new RunningQueueResponse(); + try { + if (site == null || queue == null || currentTime == 0L || top == 0) { + throw new Exception("Invalid query parameters: site == null || queue == null || currentTime == 0L || top == 0"); + } + Tuple2<String, String> queryTimeRange = getQueryTimeRange(currentTime); + Set<String> queueSet = getSubQueueSet(site, queue); + List<JobExecutionAPIEntity> runningJobs = getRunningJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1()); + List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> jobs = getJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1()); + Set<String> jobIds = new HashSet<>(); + jobs.forEach(job -> jobIds.add(job.getTags().get(JOB_ID.toString()))); + + Map<String, Long> userUsage = new HashMap<>(); + Map<String, Long> jobUsage = new HashMap<>(); + for (JobExecutionAPIEntity job : runningJobs) { + String jobId = job.getTags().get(JOB_ID.toString()); + String jobQueue = job.getTags().get(JOB_QUEUE.toString()); + String user = job.getTags().get(USER.toString()); + + if (jobIds.contains(jobId) && queueSet.contains(jobQueue)) { + if (userUsage.containsKey(user)) { + userUsage.put(user, userUsage.get(user) + job.getAllocatedMB()); + } else { + userUsage.put(user, job.getAllocatedMB()); + } + jobUsage.put(jobId, job.getAllocatedMB()); + } + } + result.setJobs(getTopRecords(top, jobUsage)); + result.setUsers(getTopRecords(top, userUsage)); + } catch (Exception e) { + result.setErrMessage(e.getMessage()); + } + return result; + } + + private List<JobExecutionAPIEntity> getRunningJobs(String site, long currentTime, String startTime, String endTime) throws Exception { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + String query = String.format("%s[@site=\"%s\" and @startTime<=%s and (@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, @allocatedMB}", + JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime); + GenericServiceAPIResponseEntity<JobExecutionAPIEntity> runningJobResponse = resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); + + if (!runningJobResponse.isSuccess() || runningJobResponse.getObj() == null) { + throw new IOException(runningJobResponse.getException()); + } + + return runningJobResponse.getObj(); + } + + private List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> getJobs(String site, + long currentTime, + String startTime, + String endTime) throws Exception { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + String query = String.format("%s[@site=\"%s\" and @startTime<=%s and @endTime>%s]{@jobId}", JPA_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime); + + GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> response = + resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); + + if (!response.isSuccess() || response.getObj() == null) { + throw new IOException(response.getException()); + } + + return response.getObj(); + } + + private Set<String> getSubQueueSet(String site, String parentQueue) throws IOException { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + + String query = String.format("%s[@site=\"%s\" and @queue=\"%s\"]{*}", QUEUE_MAPPING_SERVICE_NAME, site, parentQueue); + GenericServiceAPIResponseEntity<QueueStructureAPIEntity> responseEntity = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); + + if (!responseEntity.isSuccess() || responseEntity.getObj() == null) { + throw new IOException(responseEntity.getException()); + } + + Set<String> subQueues = new HashSet<>(); + subQueues.add(parentQueue); + subQueues.addAll(responseEntity.getObj().get(0).getAllSubQueues()); + + return subQueues; + } + + private Tuple2<String, String> getQueryTimeRange(long currentTime) throws ParseException { + String startTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime - DateTimeUtil.ONEHOUR * 12); + String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime + DateTimeUtil.ONEMINUTE); + return new Tuple2<>(startTime, endTime); + } + + private Map<String, Long> getTopRecords(int top, Map<String, Long> map) { + Map<String, Long> newMap = new LinkedHashMap<>(); + + List<Map.Entry<String,Long>> list = new ArrayList<>(map.entrySet()); + Collections.sort(list, (o1, o2) -> o1.getValue() < o2.getValue() ? 1 : -1); + for (Map.Entry<String, Long> entry : list) { + if (newMap.size() < top) { + newMap.put(entry.getKey(), entry.getValue()); + } else { + break; + } + } + return newMap; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java new file mode 100644 index 0000000..1281b66 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import java.util.Map; + +public class RunningQueueResponse { + private String errMessage; + private Map<String, Long> jobs; + private Map<String, Long> users; + + public String getErrMessage() { + return errMessage; + } + + public void setErrMessage(String errMessage) { + this.errMessage = errMessage; + } + + public Map<String, Long> getJobs() { + return jobs; + } + + public void setJobs(Map<String, Long> jobs) { + this.jobs = jobs; + } + + public Map<String, Long> getUsers() { + return users; + } + + public void setUsers(Map<String, Long> users) { + this.users = users; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml index 49694a5..1997257 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -115,7 +115,7 @@ <property> <name>dataSinkConfig.topic</name> <displayName>Kafka Topic for Parsed Data Sink</displayName> - <value>hdfs_audit_log_enriched</value> + <value>hdfs_audit_event</value> <description>topic for kafka data sink</description> </property> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index 87d3202..1142e1b 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -133,7 +133,7 @@ <property> <name>dataSinkConfig.topic</name> <displayName>Topic For Kafka Data Sink</displayName> - <value>topology_health_check</value> + <value>topology_check</value> <description>topic For kafka data sink</description> </property> <property>