Repository: eagle
Updated Branches:
  refs/heads/master 4ed406abc -> 077e27b23


[EAGLE-889] Add a restful API for Hadoop running queue to query top N users or 
jobs

https://issues.apache.org/jira/browse/EAGLE-889

Sample 1: query the top N users/jobs of memory usage under queue1 on site1 at 
time 1487156721356
http://localhost:9090/rest/queue/memory?top=10&queue=queue1&currentTime=1487156721356&site=site1

Sample 2: query the current queue hierarchy
http://localhost:9090/rest/entities?query=QueueMappingService[site=%22sandbox%22]{*}&pageSize=10000

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #805 from qingwen220/EAGLE-889.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/077e27b2
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/077e27b2
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/077e27b2

Branch: refs/heads/master
Commit: 077e27b236c4b31454a45bb66a7372bfcd01e13a
Parents: 4ed406a
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Thu Feb 16 15:41:20 2017 +0800
Committer: Zhao, Qingwen <qingwz...@apache.org>
Committed: Thu Feb 16 15:41:20 2017 +0800

----------------------------------------------------------------------
 .../queue/common/HadoopClusterConstants.java    |   1 +
 .../crawler/SchedulerInfoParseListener.java     |  25 ++-
 .../model/HadoopQueueEntityRepository.java      |   2 +
 .../scheduler/QueueStructureAPIEntity.java      |  69 +++++++++
 .../storm/HadoopQueueMetricPersistBolt.java     |  15 +-
 ...doop.queue.HadoopQueueRunningAppProvider.xml |   2 +-
 .../jpm/mr/running/parser/MRJobParser.java      |   2 +
 eagle-jpm/eagle-jpm-service/pom.xml             |   5 +
 .../eagle/service/jpm/RunningQueueResource.java | 154 +++++++++++++++++++
 .../eagle/service/jpm/RunningQueueResponse.java |  50 ++++++
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |   2 +-
 ....eagle.topology.TopologyCheckAppProvider.xml |   2 +-
 12 files changed, 316 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 9a08f05..1d64f87 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -80,6 +80,7 @@ public class HadoopClusterConstants {
     }
 
     public static final String RUNNING_QUEUE_SERVICE_NAME = 
"RunningQueueService";
+    public static final String QUEUE_MAPPING_SERVICE_NAME = 
"QueueMappingService";
 
     // tag constants
     public static final String TAG_PARENT_QUEUE = "parentQueue";

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index b0452c9..67cc5c9 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -22,7 +22,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
 import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 
 import backtype.storm.spout.SpoutOutputCollector;
@@ -41,7 +43,7 @@ public class SchedulerInfoParseListener {
     //private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
     //private int MAX_CACHE_COUNT = 1000;
 
-    private final List<RunningQueueAPIEntity> runningQueueAPIEntities = new 
ArrayList<>();
+    private final List<TaggedLogAPIEntity> runningQueueAPIEntities = new 
ArrayList<>();
     private final List<GenericMetricEntity> metricEntities = new ArrayList<>();
 
     private String site;
@@ -56,6 +58,7 @@ public class SchedulerInfoParseListener {
         Map<String, String> tags = buildMetricTags(null, null);
         createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, 
currentTimestamp, scheduler.getCapacity());
         createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, 
currentTimestamp, scheduler.getUsedCapacity());
+
         for (Queue queue : scheduler.getQueues().getQueue()) {
             createQueues(queue, currentTimestamp, scheduler, null);
         }
@@ -69,7 +72,7 @@ public class SchedulerInfoParseListener {
 
         LOG.info("Flushing {} RunningQueueEntities in memory", 
runningQueueAPIEntities.size());
         messageId = new 
HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, 
HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
-        List<RunningQueueAPIEntity> entities = new 
ArrayList<>(runningQueueAPIEntities);
+        List<TaggedLogAPIEntity> entities = new 
ArrayList<>(runningQueueAPIEntities);
         collector.emit(new 
ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), 
messageId);
 
         runningQueueAPIEntities.clear();
@@ -97,7 +100,7 @@ public class SchedulerInfoParseListener {
         this.metricEntities.add(e);
     }
 
-    private void createQueues(Queue queue, long currentTimestamp, 
SchedulerInfo scheduler, String parentQueueName) throws Exception {
+    private List<String> createQueues(Queue queue, long currentTimestamp, 
SchedulerInfo scheduler, String parentQueueName) throws Exception {
         RunningQueueAPIEntity _entity = new RunningQueueAPIEntity();
         Map<String, String> _tags = buildMetricTags(queue.getQueueName(), 
parentQueueName);
         _entity.setTags(_tags);
@@ -123,7 +126,6 @@ public class SchedulerInfoParseListener {
         UserWrappers users = new UserWrappers();
         users.setUsers(userList);
         _entity.setUsers(users);
-
         runningQueueAPIEntities.add(_entity);
 
         createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, 
currentTimestamp, queue.getNumPendingApplications());
@@ -145,11 +147,24 @@ public class SchedulerInfoParseListener {
             }
         }
 
+        List<String> subQueues = new ArrayList<>();
+        List<String> allSubQueues = new ArrayList<>();
         if (queue.getQueues() != null && queue.getQueues().getQueue() != null) 
{
             for (Queue subQueue : queue.getQueues().getQueue()) {
-                createQueues(subQueue, currentTimestamp, scheduler, 
queue.getQueueName());
+                subQueues.add(subQueue.getQueueName());
+                allSubQueues.add(subQueue.getQueueName());
+                List<String> queues = createQueues(subQueue, currentTimestamp, 
scheduler, queue.getQueueName());
+                allSubQueues.addAll(queues);
             }
         }
+        QueueStructureAPIEntity queueStructureAPIEntity = new 
QueueStructureAPIEntity();
+        queueStructureAPIEntity.setTags(_tags);
+        queueStructureAPIEntity.setSubQueues(subQueues);
+        queueStructureAPIEntity.setAllSubQueues(allSubQueues);
+        queueStructureAPIEntity.setLastUpdateTime(currentTimestamp);
+        runningQueueAPIEntities.add(queueStructureAPIEntity);
+
+        return allSubQueues;
     }
 
     private UserWrapper wrapUser(User user) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
index f598779..40d6e53 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -17,11 +17,13 @@
  */
 package org.apache.eagle.hadoop.queue.model;
 
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.log.entity.repo.EntityRepository;
 
 public class HadoopQueueEntityRepository extends EntityRepository {
     public HadoopQueueEntityRepository() {
         this.registerEntity(RunningQueueAPIEntity.class);
+        this.registerEntity(QueueStructureAPIEntity.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
new file mode 100644
index 0000000..72f67bc
--- /dev/null
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.List;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("queue_map")
+@ColumnFamily("f")
+@Prefix("queueMap")
+@Service(HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME)
+@TimeSeries(false)
+@Partition( {"site"})
+public class QueueStructureAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private List<String> subQueues;
+    @Column("b")
+    private List<String> allSubQueues;
+    @Column("c")
+    private long lastUpdateTime;
+
+    public List<String> getSubQueues() {
+        return subQueues;
+    }
+
+    public void setSubQueues(List<String> subQueues) {
+        this.subQueues = subQueues;
+        valueChanged("subQueues");
+    }
+
+    public List<String> getAllSubQueues() {
+        return allSubQueues;
+    }
+
+    public void setAllSubQueues(List<String> allSubQueues) {
+        this.allSubQueues = allSubQueues;
+        valueChanged("allSubQueues");
+    }
+
+    public long getLastUpdateTime() {
+        return lastUpdateTime;
+    }
+
+    public void setLastUpdateTime(long lastUpdateTime) {
+        this.lastUpdateTime = lastUpdateTime;
+        valueChanged("lastUpdateTime");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 1bafc13..9eb7008 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -30,6 +30,7 @@ import 
org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import 
org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -72,10 +73,14 @@ public class HadoopQueueMetricPersistBolt extends 
BaseRichBolt {
             List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) 
data;
             writeMetrics(metrics);
         } else if 
(dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
-            List<RunningQueueAPIEntity> entities = 
(List<RunningQueueAPIEntity>) data;
-            for (RunningQueueAPIEntity queue : entities) {
-                if (queue.getUsers() != null && 
!queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
-                    collector.emit(new 
Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), 
parseLeafQueueInfo(queue)));
+            List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) 
data;
+            for (TaggedLogAPIEntity entity : entities) {
+                if (entity instanceof RunningQueueAPIEntity) {
+                    RunningQueueAPIEntity queue = (RunningQueueAPIEntity) 
entity;
+                    if (queue.getUsers() != null && 
!queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
+                        collector.emit(new 
Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE),
+                                parseLeafQueueInfo(queue)));
+                    }
                 }
             }
             writeEntities(entities);
@@ -99,7 +104,7 @@ public class HadoopQueueMetricPersistBolt extends 
BaseRichBolt {
         }
     }
 
-    private void writeEntities(List<RunningQueueAPIEntity> entities) {
+    private void writeEntities(List<TaggedLogAPIEntity> entities) {
         try {
             GenericServiceAPIResponseEntity response = client.create(entities);
             if (!response.isSuccess()) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
 
b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 4cf745c..5fb041d 100644
--- 
a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ 
b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -57,7 +57,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>dataSinkConfig.topic</displayName>
-            <value>hadoop_leaf_queue</value>
+            <value>yarn_queue</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 6b33d31..525ffc2 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -135,6 +135,8 @@ public class MRJobParser implements Runnable {
         JobExecutionAPIEntity jobExecutionAPIEntity = 
mrJobEntityMap.get(mrJobId);
         
jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString());
         
jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString());
+        // set an estimated job finished time because it's hard the get the 
specific one
+        jobExecutionAPIEntity.setEndTime(System.currentTimeMillis());
         mrJobConfigs.remove(mrJobId);
         if (mrJobConfigs.size() == 0) {
             this.parserStatus = ParserStatus.APP_FINISHED;

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/pom.xml 
b/eagle-jpm/eagle-jpm-service/pom.xml
index d6807bd..197740e 100644
--- a/eagle-jpm/eagle-jpm-service/pom.xml
+++ b/eagle-jpm/eagle-jpm-service/pom.xml
@@ -43,5 +43,10 @@
             <artifactId>eagle-jpm-entity</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-hadoop-queue</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
new file mode 100644
index 0000000..2632423
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
+import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.*;
+
+import static 
org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME;
+import static 
org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME;
+import static 
org.apache.eagle.jpm.util.Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_QUEUE;
+import static org.apache.eagle.jpm.util.MRJobTagName.USER;
+
+@Path("queue")
+public class RunningQueueResource {
+
+    @GET
+    @Path("memory")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RunningQueueResponse getTopByQueue(@QueryParam("site") String site,
+                                              @QueryParam("queue") String 
queue,
+                                              @QueryParam("currentTime") long 
currentTime,
+                                              @QueryParam("top") int top) {
+        RunningQueueResponse result = new RunningQueueResponse();
+        try {
+            if (site == null || queue == null || currentTime == 0L || top == 
0) {
+                throw new Exception("Invalid query parameters: site == null || 
queue == null || currentTime == 0L || top == 0");
+            }
+            Tuple2<String, String> queryTimeRange = 
getQueryTimeRange(currentTime);
+            Set<String> queueSet = getSubQueueSet(site, queue);
+            List<JobExecutionAPIEntity> runningJobs = getRunningJobs(site, 
currentTime, queryTimeRange.f0(), queryTimeRange.f1());
+            List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> 
jobs = getJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1());
+            Set<String> jobIds = new HashSet<>();
+            jobs.forEach(job -> 
jobIds.add(job.getTags().get(JOB_ID.toString())));
+
+            Map<String, Long> userUsage = new HashMap<>();
+            Map<String, Long> jobUsage = new HashMap<>();
+            for (JobExecutionAPIEntity job : runningJobs) {
+                String jobId = job.getTags().get(JOB_ID.toString());
+                String jobQueue = job.getTags().get(JOB_QUEUE.toString());
+                String user = job.getTags().get(USER.toString());
+
+                if (jobIds.contains(jobId) && queueSet.contains(jobQueue)) {
+                    if (userUsage.containsKey(user)) {
+                        userUsage.put(user, userUsage.get(user) + 
job.getAllocatedMB());
+                    } else {
+                        userUsage.put(user, job.getAllocatedMB());
+                    }
+                    jobUsage.put(jobId, job.getAllocatedMB());
+                }
+            }
+            result.setJobs(getTopRecords(top, jobUsage));
+            result.setUsers(getTopRecords(top, userUsage));
+        } catch (Exception e) {
+            result.setErrMessage(e.getMessage());
+        }
+        return result;
+    }
+
+    private List<JobExecutionAPIEntity> getRunningJobs(String site, long 
currentTime, String startTime, String endTime) throws Exception {
+        GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
+        String query = String.format("%s[@site=\"%s\" and @startTime<=%s and 
(@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, 
@allocatedMB}",
+                JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, 
currentTime);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> 
runningJobResponse = resource.search(query, startTime, endTime, 
Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+        if (!runningJobResponse.isSuccess() || runningJobResponse.getObj() == 
null) {
+            throw new IOException(runningJobResponse.getException());
+        }
+
+        return runningJobResponse.getObj();
+    }
+
+    private List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> 
getJobs(String site,
+                                                                               
       long currentTime,
+                                                                               
       String startTime,
+                                                                               
       String endTime) throws Exception {
+        GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
+        String query = String.format("%s[@site=\"%s\" and @startTime<=%s and 
@endTime>%s]{@jobId}", JPA_JOB_EXECUTION_SERVICE_NAME, site, currentTime, 
currentTime);
+
+        
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity>
 response =
+                resource.search(query, startTime, endTime, Integer.MAX_VALUE, 
null, false, false, 0L, 0, false, 0, null, false);
+
+        if (!response.isSuccess() || response.getObj() == null) {
+            throw new IOException(response.getException());
+        }
+
+        return response.getObj();
+    }
+
+    private Set<String> getSubQueueSet(String site, String parentQueue) throws 
IOException {
+        GenericEntityServiceResource resource = new 
GenericEntityServiceResource();
+
+        String query = String.format("%s[@site=\"%s\" and @queue=\"%s\"]{*}", 
QUEUE_MAPPING_SERVICE_NAME, site, parentQueue);
+        GenericServiceAPIResponseEntity<QueueStructureAPIEntity> 
responseEntity = resource.search(query, null, null, Integer.MAX_VALUE, null, 
false, false, 0L, 0, false, 0, null, false);
+
+        if (!responseEntity.isSuccess() || responseEntity.getObj() == null) {
+            throw new IOException(responseEntity.getException());
+        }
+
+        Set<String> subQueues = new HashSet<>();
+        subQueues.add(parentQueue);
+        subQueues.addAll(responseEntity.getObj().get(0).getAllSubQueues());
+
+        return subQueues;
+    }
+
+    private Tuple2<String, String> getQueryTimeRange(long currentTime) throws 
ParseException {
+        String startTime = 
DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime - 
DateTimeUtil.ONEHOUR * 12);
+        String endTime = 
DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime + 
DateTimeUtil.ONEMINUTE);
+        return new Tuple2<>(startTime, endTime);
+    }
+
+    private Map<String, Long> getTopRecords(int top, Map<String, Long> map) {
+        Map<String, Long> newMap = new LinkedHashMap<>();
+
+        List<Map.Entry<String,Long>> list = new ArrayList<>(map.entrySet());
+        Collections.sort(list, (o1, o2) -> o1.getValue() < o2.getValue() ? 1 : 
-1);
+        for (Map.Entry<String, Long> entry : list) {
+            if (newMap.size() < top) {
+                newMap.put(entry.getKey(), entry.getValue());
+            } else {
+                break;
+            }
+        }
+        return newMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
new file mode 100644
index 0000000..1281b66
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import java.util.Map;
+
+public class RunningQueueResponse {
+    private String errMessage;
+    private Map<String, Long> jobs;
+    private Map<String, Long> users;
+
+    public String getErrMessage() {
+        return errMessage;
+    }
+
+    public void setErrMessage(String errMessage) {
+        this.errMessage = errMessage;
+    }
+
+    public Map<String, Long> getJobs() {
+        return jobs;
+    }
+
+    public void setJobs(Map<String, Long> jobs) {
+        this.jobs = jobs;
+    }
+
+    public Map<String, Long> getUsers() {
+        return users;
+    }
+
+    public void setUsers(Map<String, Long> users) {
+        this.users = users;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 49694a5..1997257 100644
--- 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -115,7 +115,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>Kafka Topic for Parsed Data Sink</displayName>
-            <value>hdfs_audit_log_enriched</value>
+            <value>hdfs_audit_event</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
 
b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 87d3202..1142e1b 100644
--- 
a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ 
b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -133,7 +133,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>Topic For Kafka Data Sink</displayName>
-            <value>topology_health_check</value>
+            <value>topology_check</value>
             <description>topic For kafka data sink</description>
         </property>
         <property>

Reply via email to