This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git
The following commit(s) were added to refs/heads/master by this push: new 0bd2f8a add dashboard service for aggregated information collection (#326) 0bd2f8a is described below commit 0bd2f8a04b09c536f7001fecae0d0e926a46f8bd Author: Jiechuan Chen <654815...@qq.com> AuthorDate: Thu Oct 1 09:13:03 2020 +0800 add dashboard service for aggregated information collection (#326) Fixes #282 ### Motivation This is the way I understand how it would be: The access controll is done by environmentController when users retrive the environment list from 'environment' front end page, where the aggregated data is displayed on. And we can then retrieve the total number of tenant and corresponding namespace from certain environments, as well as a list of topicStat, from which we can further retrieve the number of cluster, broker, producer and consumer. 1. environment -> tenant 2. tenant -> namespace 3. environment -> topicStat 4. topicStat -> cluster 5. topicStat -> broker 6. topicStat -> producer 7. topicStatId -> consumer For the number of bookies, currently it is not persistent in database, so the bookieservice is called to retrieve the data. *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* ### Modifications As #282 suggested, added some new sql statements in corresponding mapper and a new service to collect aggregated data for dashboard. ### Verifying this change - [ yes ] Make sure that the change passes the `./gradlew build` checks. --- .../manager/controller/DashboardController.java | 57 ++++++++ .../manager/dao/ConsumersStatsRepositoryImpl.java | 7 + .../manager/dao/NamespacesRepositoryImpl.java | 6 + .../pulsar/manager/dao/TenantsRepositoryImpl.java | 6 + .../manager/dao/TopicsStatsRepositoryImpl.java | 5 + .../manager/entity/ConsumersStatsRepository.java | 5 + .../manager/entity/NamespacesRepository.java | 2 + .../pulsar/manager/entity/TenantsRepository.java | 2 + .../manager/entity/TopicsStatsRepository.java | 3 + .../pulsar/manager/mapper/ConsumerStatsMapper.java | 14 ++ .../pulsar/manager/mapper/NamespacesMapper.java | 6 + .../pulsar/manager/mapper/TenantsMapper.java | 8 ++ .../pulsar/manager/mapper/TopicsStatsMapper.java | 15 +- .../pulsar/manager/service/DashboardService.java | 21 +++ .../manager/service/impl/DashboardServiceImpl.java | 105 ++++++++++++++ .../manager/service/DashboardServiceImplTest.java | 151 +++++++++++++++++++++ 16 files changed, 412 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/pulsar/manager/controller/DashboardController.java b/src/main/java/org/apache/pulsar/manager/controller/DashboardController.java new file mode 100644 index 0000000..33617c2 --- /dev/null +++ b/src/main/java/org/apache/pulsar/manager/controller/DashboardController.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.manager.controller; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.pulsar.manager.service.DashboardService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; + +/** + * Dashboard rest api + */ +@RestController +@RequestMapping(value = "/pulsar-manager") +@Api(description = "Support dashboard query.") +@Validated +public class DashboardController { + + private final DashboardService dashboardService; + + @Autowired + public DashboardController( + DashboardService dashboardService) { + this.dashboardService = dashboardService; + } + + @ApiOperation(value = "Get the dashboard stats") + @ApiResponses({ + @ApiResponse(code = 200, message = "ok"), + @ApiResponse(code = 500, message = "Internal server error") + }) + @RequestMapping(value = "/dashboard", method = RequestMethod.GET) + public ResponseEntity<Map<String, Object>> getDashboardStats( + @RequestBody List<String> environmentList) { + Map<String, Object> result = dashboardService.getDashboardStats(environmentList); + return ResponseEntity.ok(result); + } +} diff --git a/src/main/java/org/apache/pulsar/manager/dao/ConsumersStatsRepositoryImpl.java b/src/main/java/org/apache/pulsar/manager/dao/ConsumersStatsRepositoryImpl.java index 64c17c2..237daa4 100644 --- a/src/main/java/org/apache/pulsar/manager/dao/ConsumersStatsRepositoryImpl.java +++ b/src/main/java/org/apache/pulsar/manager/dao/ConsumersStatsRepositoryImpl.java @@ -15,12 +15,15 @@ package org.apache.pulsar.manager.dao; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; +import org.apache.ibatis.annotations.Param; import org.apache.pulsar.manager.entity.ConsumerStatsEntity; import org.apache.pulsar.manager.entity.ConsumersStatsRepository; import org.apache.pulsar.manager.mapper.ConsumerStatsMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository public class ConsumersStatsRepositoryImpl implements ConsumersStatsRepository { @@ -55,6 +58,10 @@ public class ConsumersStatsRepositoryImpl implements ConsumersStatsRepository { return consumerStatsMapper.findByReplicationStatsId(replicationStatsId, timestamp); } + public List<ConsumerStatsEntity> findByMultiTopicStatsId(List<Long> topicStatsIdList, long timestamp) { + return consumerStatsMapper.findByMultiTopicStatsId(topicStatsIdList, timestamp); + } + public void remove(long timestamp, long timeInterval) { consumerStatsMapper.delete(timestamp - timeInterval); } diff --git a/src/main/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImpl.java b/src/main/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImpl.java index ca8f30c..61c6824 100644 --- a/src/main/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImpl.java +++ b/src/main/java/org/apache/pulsar/manager/dao/NamespacesRepositoryImpl.java @@ -93,6 +93,12 @@ public class NamespacesRepositoryImpl implements NamespacesRepository { } @Override + public List<NamespaceEntity> findByMultiTenant(List<String> tenantList) { + List<NamespaceEntity> namespaceEntities = namespacesMapper.findAllByMultiTenant(tenantList); + return namespaceEntities; + } + + @Override public void remove(String tenant, String namespace) { namespacesMapper.deleteByTenantNamespace(tenant, namespace); } diff --git a/src/main/java/org/apache/pulsar/manager/dao/TenantsRepositoryImpl.java b/src/main/java/org/apache/pulsar/manager/dao/TenantsRepositoryImpl.java index 9838439..40a5576 100644 --- a/src/main/java/org/apache/pulsar/manager/dao/TenantsRepositoryImpl.java +++ b/src/main/java/org/apache/pulsar/manager/dao/TenantsRepositoryImpl.java @@ -62,6 +62,12 @@ public class TenantsRepositoryImpl implements TenantsRepository { } @Override + public List<TenantEntity> findByMultiEnvironmentName(List<String> environmentNameList) { + List<TenantEntity> tenantEntities = tenantsMapper.findAllByMultiEnvironmentName(environmentNameList); + return tenantEntities; + } + + @Override public long save(TenantEntity tenantsEntity) { tenantsMapper.insert(tenantsEntity); return tenantsEntity.getTenantId(); diff --git a/src/main/java/org/apache/pulsar/manager/dao/TopicsStatsRepositoryImpl.java b/src/main/java/org/apache/pulsar/manager/dao/TopicsStatsRepositoryImpl.java index 824fffc..dcc0f8f 100644 --- a/src/main/java/org/apache/pulsar/manager/dao/TopicsStatsRepositoryImpl.java +++ b/src/main/java/org/apache/pulsar/manager/dao/TopicsStatsRepositoryImpl.java @@ -91,6 +91,11 @@ public class TopicsStatsRepositoryImpl implements TopicsStatsRepository { return topicsStatsMapper.findByMultiNamespace(environment, tenant, namespaceList, timestamp); } + public List<TopicStatsEntity> findByMultiEnvironment(List<String> environmentList, + long timestamp) { + return topicsStatsMapper.findByMultiEnvironment(environmentList, timestamp); + } + public void remove(long timestamp, long timeInterval) { topicsStatsMapper.delete(timestamp - timeInterval); } diff --git a/src/main/java/org/apache/pulsar/manager/entity/ConsumersStatsRepository.java b/src/main/java/org/apache/pulsar/manager/entity/ConsumersStatsRepository.java index 20c6d65..efead94 100644 --- a/src/main/java/org/apache/pulsar/manager/entity/ConsumersStatsRepository.java +++ b/src/main/java/org/apache/pulsar/manager/entity/ConsumersStatsRepository.java @@ -14,8 +14,11 @@ package org.apache.pulsar.manager.entity; import com.github.pagehelper.Page; +import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository public interface ConsumersStatsRepository { @@ -30,5 +33,7 @@ public interface ConsumersStatsRepository { Page<ConsumerStatsEntity> findByReplicationStatsId(Integer pageNum, Integer pageSize, long replicationStatsId, long timestamp); + List<ConsumerStatsEntity> findByMultiTopicStatsId(List<Long> topicStatsIdList, long timestamp); + void remove(long timestamp, long timeInterval); } diff --git a/src/main/java/org/apache/pulsar/manager/entity/NamespacesRepository.java b/src/main/java/org/apache/pulsar/manager/entity/NamespacesRepository.java index 9093328..8bcc6a4 100644 --- a/src/main/java/org/apache/pulsar/manager/entity/NamespacesRepository.java +++ b/src/main/java/org/apache/pulsar/manager/entity/NamespacesRepository.java @@ -42,6 +42,8 @@ public interface NamespacesRepository { List<NamespaceEntity> findByTenant(String tenant); + List<NamespaceEntity> findByMultiTenant(List<String> tenantList); + void remove(String tenant, String namespace); } diff --git a/src/main/java/org/apache/pulsar/manager/entity/TenantsRepository.java b/src/main/java/org/apache/pulsar/manager/entity/TenantsRepository.java index 9283cd6..ced4f24 100644 --- a/src/main/java/org/apache/pulsar/manager/entity/TenantsRepository.java +++ b/src/main/java/org/apache/pulsar/manager/entity/TenantsRepository.java @@ -34,6 +34,8 @@ public interface TenantsRepository { List<TenantEntity> findByMultiId(List<Long> tenantIdList); + List<TenantEntity> findByMultiEnvironmentName(List<String> environmentNameList); + void remove(String tenant); } diff --git a/src/main/java/org/apache/pulsar/manager/entity/TopicsStatsRepository.java b/src/main/java/org/apache/pulsar/manager/entity/TopicsStatsRepository.java index 1c2c677..e4de9ba 100644 --- a/src/main/java/org/apache/pulsar/manager/entity/TopicsStatsRepository.java +++ b/src/main/java/org/apache/pulsar/manager/entity/TopicsStatsRepository.java @@ -61,5 +61,8 @@ public interface TopicsStatsRepository { List<String> namespaceList, long timestamp); + List<TopicStatsEntity> findByMultiEnvironment(List<String> environmentList, + long timestamp); + void remove(long timestamp, long timeInterval); } diff --git a/src/main/java/org/apache/pulsar/manager/mapper/ConsumerStatsMapper.java b/src/main/java/org/apache/pulsar/manager/mapper/ConsumerStatsMapper.java index b1be1ea..7990cb3 100644 --- a/src/main/java/org/apache/pulsar/manager/mapper/ConsumerStatsMapper.java +++ b/src/main/java/org/apache/pulsar/manager/mapper/ConsumerStatsMapper.java @@ -17,6 +17,8 @@ import com.github.pagehelper.Page; import org.apache.pulsar.manager.entity.ConsumerStatsEntity; import org.apache.ibatis.annotations.*; +import java.util.List; + @Mapper public interface ConsumerStatsMapper { @@ -56,6 +58,18 @@ public interface ConsumerStatsMapper { Page<ConsumerStatsEntity> findByReplicationStatsId(@Param("replicationStatsId") long replicationStatsId, @Param("timestamp") long timestamp); + @Select({"<script>", + "SELECT consumer_stats_id as consumerStatsId,consumer as consumer,topic_stats_id as topicStatsId," + + "replication_stats_id as replicationStatsId,subscription_stats_id as subscriptionStatsId,address as address," + + "available_permits as availablePermits,connected_since as connectedSince,msg_rate_out as msgRateOut," + + "msg_throughput_out as msgThroughputOut,msg_rate_redeliver as msgRateRedeliver," + + "client_version as clientVersion,time_stamp ,metadata as metadata FROM consumers_stats " + + "where time_stamp=#{timestamp} and " + + "topic_stats_id IN <foreach collection='topicStatsIdList' item='topicStatsId' open='(' separator=',' close=')'> #{topicStatsId} </foreach>" + + "</script>"}) + List<ConsumerStatsEntity> findByMultiTopicStatsId(@Param("topicStatsIdList") List<Long> topicStatsIdList, + @Param("timestamp") long timestamp); + @Delete("DELETE FROM consumers_stats WHERE time_stamp < #{refTime}") void delete(@Param("refTime") long refTime); } diff --git a/src/main/java/org/apache/pulsar/manager/mapper/NamespacesMapper.java b/src/main/java/org/apache/pulsar/manager/mapper/NamespacesMapper.java index 06b1b96..d802b0f 100644 --- a/src/main/java/org/apache/pulsar/manager/mapper/NamespacesMapper.java +++ b/src/main/java/org/apache/pulsar/manager/mapper/NamespacesMapper.java @@ -67,6 +67,12 @@ public interface NamespacesMapper { "FROM namespaces WHERE tenant=#{tenant}") Page<NamespaceEntity> findAllByTenant(String tenant); + @Select({"<script>", + "SELECT tenant, namespace, namespace_id as namespaceId FROM namespaces", + "WHERE tenant IN <foreach collection='tenantList' item='tenant' open='(' separator=',' close=')'> #{tenant} </foreach>" + + "</script>"}) + List<NamespaceEntity> findAllByMultiTenant(@Param("tenantList") List<String> tenantList); + @Select("SELECT tenant,namespace,namespace_id as namespaceId FROM namespaces") Page<NamespaceEntity> getNamespacesList(); diff --git a/src/main/java/org/apache/pulsar/manager/mapper/TenantsMapper.java b/src/main/java/org/apache/pulsar/manager/mapper/TenantsMapper.java index 37fc2d1..82b288a 100644 --- a/src/main/java/org/apache/pulsar/manager/mapper/TenantsMapper.java +++ b/src/main/java/org/apache/pulsar/manager/mapper/TenantsMapper.java @@ -66,6 +66,14 @@ public interface TenantsMapper { "</script>"}) List<TenantEntity> findAllByMultiId(@Param("tenantIdList") List<Long> tenantIdList); + @Select({"<script>", + "SELECT tenant, tenant_id as tenantId, admin_roles as adminRoles,allowed_clusters as allowedClusters," + + "environment_name as environmentName " + + " FROM tenants ", + "WHERE environment_name IN <foreach collection='environmentNameList' item='environmentName' open='(' separator=',' close=')'> #{environmentName} </foreach>" + + "</script>"}) + List<TenantEntity> findAllByMultiEnvironmentName(@Param("environmentNameList") List<String> environmentNameList); + @Delete("DELETE FROM tenants WHERE tenant = #{tenant}") void delete(String tenant); diff --git a/src/main/java/org/apache/pulsar/manager/mapper/TopicsStatsMapper.java b/src/main/java/org/apache/pulsar/manager/mapper/TopicsStatsMapper.java index bca083b..ea231d6 100644 --- a/src/main/java/org/apache/pulsar/manager/mapper/TopicsStatsMapper.java +++ b/src/main/java/org/apache/pulsar/manager/mapper/TopicsStatsMapper.java @@ -108,7 +108,6 @@ public interface TopicsStatsMapper { @Param("tenantList") List<String> tenantList, @Param("timestamp") long timestamp); - @Select({"<script>", "SELECT environment, tenant, namespace," + "sum(producer_count) as producerCount," @@ -129,6 +128,20 @@ public interface TopicsStatsMapper { @Param("namespaceList") List<String> namespaceList, @Param("timestamp") long timestamp); + @Select({"<script>", + "SELECT topic_stats_id as topicStatsId,environment as environment,cluster as cluster,broker as broker," + + "tenant as tenant,namespace as namespace,bundle as bundle,persistent as persistent," + + "topic as topic,producer_count as producerCount,subscription_count as subscriptionCount," + + "msg_rate_in as msgRateIn,msg_throughput_in as msgThroughputIn,msg_rate_out as msgRateOut," + + "msg_throughput_out as msgThroughputOut,average_msg_size as averageMsgSize,storage_size as storageSize," + + "time_stamp FROM topics_stats " + + "WHERE time_stamp=#{timestamp} and " + + "environment IN <foreach collection='environmentList' item='environment' open='(' separator=',' close=')'> #{environment} </foreach>" + + "</script>"}) + List<TopicStatsEntity> findByMultiEnvironment( + @Param("environmentList") List<String> environmentList, + @Param("timestamp") long timestamp); + @Delete("DELETE FROM topics_stats WHERE time_stamp < #{refTime}") void delete(@Param("refTime") long refTime); } diff --git a/src/main/java/org/apache/pulsar/manager/service/DashboardService.java b/src/main/java/org/apache/pulsar/manager/service/DashboardService.java new file mode 100644 index 0000000..195dce3 --- /dev/null +++ b/src/main/java/org/apache/pulsar/manager/service/DashboardService.java @@ -0,0 +1,21 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.manager.service; + +import java.util.List; +import java.util.Map; + +public interface DashboardService { + Map<String, Object> getDashboardStats(List<String> environmentList); +} diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java new file mode 100644 index 0000000..ff276f9 --- /dev/null +++ b/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java @@ -0,0 +1,105 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.manager.service.impl; + +import com.github.pagehelper.Page; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.pulsar.manager.entity.*; +import org.apache.pulsar.manager.service.BookiesService; +import org.apache.pulsar.manager.service.DashboardService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; + +@Service +public class DashboardServiceImpl implements DashboardService { + + private final TopicsStatsRepository topicsStatsRepository; + private final ConsumersStatsRepository consumersStatsRepository; + private final TenantsRepository tenantsRepository; + private final NamespacesRepository namespacesRepository; + private final BookiesService bookiesService; + + @Autowired + public DashboardServiceImpl( + TopicsStatsRepository topicsStatsRepository, + ConsumersStatsRepository consumersStatsRepository, + TenantsRepository tenantsRepository, + NamespacesRepository namespacesRepository, + BookiesService bookiesService) { + this.topicsStatsRepository = topicsStatsRepository; + this.consumersStatsRepository = consumersStatsRepository; + this.tenantsRepository = tenantsRepository; + this.namespacesRepository = namespacesRepository; + this.bookiesService = bookiesService; + } + + public Map<String, Object> getDashboardStats(List<String> environmentList) { + int totalClusterCount = 0; + int totalBrokerCount = 0; + int totalTenantCount; + int totalNamespaceCount; + long totalTopicCount = 0; + long totalProducerCount = 0L; + long totalConsumerCount = 0L; + int totalBookieCount = 0; + Map<String, Object> dashboardStatsMap = Maps.newHashMap(); + + List<TenantEntity> tenantEntities= tenantsRepository.findByMultiEnvironmentName(environmentList); + totalTenantCount = tenantEntities.size(); + List<String> tenantList = new LinkedList<>(); + for (TenantEntity tenantEntity: tenantEntities) { + tenantList.add(tenantEntity.getTenant()); + } + List<NamespaceEntity> namespaceEntities = namespacesRepository.findByMultiTenant(tenantList); + totalNamespaceCount = namespaceEntities.size(); + + Optional<TopicStatsEntity> topicStatsEntity = topicsStatsRepository.findMaxTime(); + if (topicStatsEntity.isPresent()) { + Set<String> clusterSet = Sets.newHashSet(); + Set<String> brokerSet = Sets.newHashSet(); + TopicStatsEntity topicStats = topicStatsEntity.get(); + long timestamp = topicStats.getTime_stamp(); + List<TopicStatsEntity> topicStatsEntities = topicsStatsRepository.findByMultiEnvironment( + environmentList, timestamp); + totalTopicCount = topicStatsEntities.size(); + List<Long> topicStatsIdList = new LinkedList<>(); + for (TopicStatsEntity statsEntity : topicStatsEntities) { + clusterSet.add(statsEntity.getCluster()); + brokerSet.add(statsEntity.getBroker()); + topicStatsIdList.add(statsEntity.getTopicStatsId()); + totalProducerCount += statsEntity.getProducerCount(); + } + totalClusterCount = clusterSet.size(); + totalBrokerCount = brokerSet.size(); + List<ConsumerStatsEntity> consumerStatsEntities = consumersStatsRepository.findByMultiTopicStatsId( + topicStatsIdList, timestamp); + totalConsumerCount = consumerStatsEntities.size(); + totalBookieCount = (int) bookiesService.getBookiesList( + 1, 10, "").getOrDefault( + "total", 0); + } + dashboardStatsMap.put("totalClusterCount", totalClusterCount); + dashboardStatsMap.put("totalBrokerCount", totalBrokerCount); + dashboardStatsMap.put("totalTenantCount", totalTenantCount); + dashboardStatsMap.put("totalNamespaceCount", totalNamespaceCount); + dashboardStatsMap.put("totalTopicCount", totalTopicCount); + dashboardStatsMap.put("totalProducerCount", totalProducerCount); + dashboardStatsMap.put("totalConsumerCount", totalConsumerCount); + dashboardStatsMap.put("totalBookieCount", totalBookieCount); + return dashboardStatsMap; + } +} diff --git a/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java new file mode 100644 index 0000000..0c67791 --- /dev/null +++ b/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java @@ -0,0 +1,151 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.manager.service; + +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.manager.PulsarManagerApplication; +import org.apache.pulsar.manager.entity.*; +import org.apache.pulsar.manager.profiles.HerdDBTestProfile; +import org.apache.pulsar.manager.utils.HttpUtil; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(SpringRunner.class) +@PowerMockIgnore( {"javax.*", "sun.*", "com.sun.*", "org.xml.*", "org.w3c.*"}) +@PrepareForTest(HttpUtil.class) +@TestPropertySource(locations= "classpath:test-bookie.properties") +@SpringBootTest( + classes = { + PulsarManagerApplication.class, + HerdDBTestProfile.class + } +) +@ActiveProfiles("test") +public class DashboardServiceImplTest { + + @Autowired + DashboardService dashboardService; + + @Autowired + TopicsStatsRepository topicsStatsRepository; + + @Autowired + ConsumersStatsRepository consumersStatsRepository; + + @Autowired + TenantsRepository tenantsRepository; + + @Autowired + NamespacesRepository namespacesRepository; + + @Value("${backend.jwt.token}") + private static String pulsarJwtToken; + + @Test + public void getDashboardStatsTest() { + List<String> environmentList = Arrays.asList("environment0", "environment1"); + List<String> clusterList = Arrays.asList("cluster0"); + List<String> brokerList = Arrays.asList("broker1", "broker2"); + List<String> tenantList = Arrays.asList("tenant0", "tenant1", "tenant2"); + List<String> namespaceList = Arrays.asList("namespace0", "namespace1", "namespace2", "namespace3"); + List<Long> timestampList = Arrays.asList(System.currentTimeMillis() / 1000L, System.currentTimeMillis() / 1000L - 10); + int producerPerTopic = 1; + int consumerPerTopic = 1; + + PowerMockito.mockStatic(HttpUtil.class); + Map<String, String> header = Maps.newHashMap(); + header.put("Content-Type", "application/json"); + if (StringUtils.isNotBlank(pulsarJwtToken)) { + header.put("Authorization", String.format("Bearer %s", pulsarJwtToken)); + } + PowerMockito.when(HttpUtil.doGet("http://localhost:8050/api/v1/bookie/list_bookies?type=rw&print_hostnames=true", header)) + .thenReturn("{\"192.168.2.116:3181\" : \"192.168.2.116\"}"); + PowerMockito.when(HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header)) + .thenReturn("{ }"); + PowerMockito.when(HttpUtil.doGet("http://localhost:8050/api/v1/bookie/list_bookie_info", header)) + .thenReturn("{\"192.168.2.116:3181\" : \": {Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}," + + "\",\"ClusterInfo: \" : \"{Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}\" }"); + + long topicStatsId = 0L; + for (String tenant: tenantList) { + TenantEntity tenantEntity = new TenantEntity(); + tenantEntity.setEnvironmentName(environmentList.get(0)); + tenantEntity.setTenant(tenant); + tenantsRepository.save(tenantEntity); + for (String namespace: namespaceList) { + NamespaceEntity namespaceEntity = new NamespaceEntity(); + namespaceEntity.setTenant(tenant); + namespaceEntity.setNamespace(namespace); + namespacesRepository.save(namespaceEntity); + } + } + for (String environment: environmentList) { + for (Long timestamp : timestampList) { + for (String cluster : clusterList) { + for (String broker : brokerList) { + TopicStatsEntity topicStatsEntity = new TopicStatsEntity(); + topicStatsEntity.setEnvironment(environment); + topicStatsEntity.setPersistent("persistent"); + topicStatsEntity.setBundle("neutral"); + topicStatsEntity.setCluster(cluster); + topicStatsEntity.setBroker(broker); + topicStatsEntity.setTenant(tenantList.get(0)); + topicStatsEntity.setNamespace(namespaceList.get(0)); + topicStatsEntity.setTopic("neutral"); + topicStatsEntity.setProducerCount(producerPerTopic); + topicStatsEntity.setTime_stamp(timestamp); + topicsStatsRepository.save(topicStatsEntity); + topicStatsId++; + for (int i = 0; i < consumerPerTopic; i++) { + ConsumerStatsEntity consumerStatsEntity = new ConsumerStatsEntity(); + consumerStatsEntity.setConsumer("neutral"); + consumerStatsEntity.setTopicStatsId(topicStatsId); + consumerStatsEntity.setTime_stamp(timestamp); + consumersStatsRepository.save(consumerStatsEntity); + } + } + } + } + } + + long topicCount = clusterList.size() * brokerList.size(); + Map<String, Object> dashboardStats = dashboardService.getDashboardStats(Arrays.asList(environmentList.get(0))); + Assert.assertEquals(clusterList.size(), dashboardStats.get("totalClusterCount")); + Assert.assertEquals(brokerList.size(), dashboardStats.get("totalBrokerCount")); + Assert.assertEquals(tenantList.size(), dashboardStats.get("totalTenantCount")); + Assert.assertEquals(tenantList.size() * namespaceList.size(), dashboardStats.get("totalNamespaceCount")); + Assert.assertEquals(topicCount, dashboardStats.get("totalTopicCount")); + Assert.assertEquals(topicCount * producerPerTopic, dashboardStats.get("totalProducerCount")); + Assert.assertEquals(topicCount * consumerPerTopic, dashboardStats.get("totalConsumerCount")); + Assert.assertEquals(1, dashboardStats.get("totalBookieCount")); + } +}