This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git
The following commit(s) were added to refs/heads/master by this push: new 2b0671a Multi bookie cluster Support (#388) 2b0671a is described below commit 2b0671aaa30af1df81b9c13ac0d151f8a9992fea Author: 郭中奇 <16098391+g0715...@users.noreply.github.com> AuthorDate: Tue Jun 22 10:17:41 2021 +0800 Multi bookie cluster Support (#388) Fixes #387 Master Issue: #387 *Describe the modifications you've done.* Support multi-bookie cluster, no longer use the address configured in the application.properties file --- docker/init_db.sql | 1 + front-end/src/lang/en.js | 6 + front-end/src/lang/zh.js | 6 + .../src/views/management/environments/index.vue | 337 +++++++++++---------- .../pulsar/manager/PulsarApplicationListener.java | 8 +- .../pulsar/manager/entity/EnvironmentEntity.java | 1 + .../pulsar/manager/mapper/EnvironmentsMapper.java | 14 +- .../manager/service/EnvironmentCacheService.java | 6 + .../manager/service/impl/BookiesServiceImpl.java | 28 +- .../manager/service/impl/DashboardServiceImpl.java | 2 +- .../service/impl/EnvironmentCacheServiceImpl.java | 33 +- src/main/resources/META-INF/sql/herddb-schema.sql | 3 +- src/main/resources/META-INF/sql/mysql-schema.sql | 1 + .../resources/META-INF/sql/postgresql-schema.sql | 1 + src/main/resources/META-INF/sql/sqlite-schema.sql | 1 + src/main/resources/application.properties | 2 +- .../dao/EnvironmentsRepositoryImplTest.java | 3 + .../manager/service/BookiesServiceImplTest.java | 12 +- .../manager/service/DashboardServiceImplTest.java | 2 +- .../service/EnvironmentCacheServiceImplTest.java | 3 + 20 files changed, 274 insertions(+), 196 deletions(-) diff --git a/docker/init_db.sql b/docker/init_db.sql index 42314d7..5dcf3bd 100644 --- a/docker/init_db.sql +++ b/docker/init_db.sql @@ -21,6 +21,7 @@ GRANT ALL PRIVILEGES ON DATABASE pulsar_manager to pulsar; CREATE TABLE IF NOT EXISTS environments ( name varchar(256) NOT NULL, broker varchar(1024) NOT NULL, + bookie varchar(1024) NOT NULL, CONSTRAINT PK_name PRIMARY KEY (name), UNIQUE (broker) ); diff --git a/front-end/src/lang/en.js b/front-end/src/lang/en.js index f2c974f..6e359da 100644 --- a/front-end/src/lang/en.js +++ b/front-end/src/lang/en.js @@ -664,6 +664,7 @@ export default { deleteClusterSuccessNotification: 'Successfully delete a cluster', clusterNameIsRequired: 'Cluster name is required', serviceUrlIsRequired: 'Service URL is required', + bookieUrlIsRequired: 'Bookie URL is required', deleteClusterDialogCaption: 'Delete Cluster', updateClusterSuccessNotification: 'Successfully update a cluster' }, @@ -751,6 +752,7 @@ export default { buttonNewEnv: 'New Environment', colHeadingEnv: 'Environment Name', colHeadingServiceUrl: 'Service URL', + colHeadingBookieUrl: 'Bookie URL', newEnvDialogCaption: 'New Environment', newEnvNamePlaceHolder: 'Please input environment name', newEnvNameLabel: 'Environment Name', @@ -759,7 +761,11 @@ export default { updateEnvDialogCaption: 'Update Environment', updateEnvNameLabel: 'Environment Name', updateEnvServiceUrlPlaceHolder: 'Please input environment service url', + updateEnvBookieUrlPlaceHolder: 'Please input environment bookie url', + newEnvBookieUrlPlaceHolder: 'Please input environment bookie url', updateEnvServiceUrlLabel: 'Service URL', + updateEnvBookieUrlLabel: 'Bookie URL', + newEnvBookieUrlLabel: 'Bookie URL', deleteEnvDialogCaption: 'Delete Environment', deleteEnvDialogText: 'Are you sure you want to delete this environment?', envNameIsRequired: 'Environment Name is required', diff --git a/front-end/src/lang/zh.js b/front-end/src/lang/zh.js index db9b299..d466f8b 100644 --- a/front-end/src/lang/zh.js +++ b/front-end/src/lang/zh.js @@ -662,6 +662,7 @@ export default { deleteClusterSuccessNotification: 'Successfully delete a cluster', clusterNameIsRequired: 'Cluster name is required', serviceUrlIsRequired: 'Service URL is required', + bookieUrlIsRequired: 'Bookie URL is required', deleteClusterDialogCaption: 'Delete Cluster', updateClusterSuccessNotification: 'Successfully update a cluster' }, @@ -749,15 +750,20 @@ export default { buttonNewEnv: 'New Environment', colHeadingEnv: 'Environment Name', colHeadingServiceUrl: 'Service URL', + colHeadingBookieUrl: 'Bookie URL', newEnvDialogCaption: 'New Environment', newEnvNamePlaceHolder: 'Please input environment name', newEnvNameLabel: 'Environment Name', newEnvServiceUrlPlaceHolder: 'Please input environment service url', + newEnvBookieUrlPlaceHolder: 'Please input environment bookie url', newEnvServiceUrlLabel: 'Service URL', + newEnvBookieUrlLabel: 'Bookie URL', updateEnvDialogCaption: 'Update Environment', updateEnvNameLabel: 'Environment Name', updateEnvServiceUrlPlaceHolder: 'Please input environment service url', + updateEnvBookieUrlPlaceHolder: 'Please input environment bookie url', updateEnvServiceUrlLabel: 'Service URL', + updateEnvBookieUrlLabel: 'Bookie URL', deleteEnvDialogCaption: 'Delete Environment', deleteEnvDialogText: 'Are you sure you want to delete this environment?', envNameIsRequired: 'Environment Name is required', diff --git a/front-end/src/views/management/environments/index.vue b/front-end/src/views/management/environments/index.vue index fc2dfac..2aea2d7 100644 --- a/front-end/src/views/management/environments/index.vue +++ b/front-end/src/views/management/environments/index.vue @@ -39,6 +39,11 @@ <span>{{ scope.row.broker }}</span> </template> </el-table-column> + <el-table-column :label="$t('env.colHeadingBookieUrl')" align="center" min-width="100px"> + <template slot-scope="scope"> + <span>{{ scope.row.bookie }}</span> + </template> + </el-table-column> <el-table-column v-if="superUser" :label="$t('table.actions')" align="center" class-name="small-padding fixed-width"> <template slot-scope="scope"> <el-button type="primary" size="mini" @click="handleUpdateEnvironment(scope.row)">{{ $t('table.edit') }}</el-button> @@ -57,12 +62,21 @@ <el-form-item v-if="dialogStatus==='create'" :label="$t('env.newEnvServiceUrlLabel')" prop="broker"> <el-input v-model="form.broker" :placeholder="$t('env.newEnvServiceUrlPlaceHolder')"/> </el-form-item> + + <el-form-item v-if="dialogStatus==='create'" :label="$t('env.newEnvBookieUrlLabel')" prop="bookie"> + <el-input v-model="form.bookie" :placeholder="$t('env.newEnvBookieUrlPlaceHolder')"/> + </el-form-item> + <el-form-item v-if="dialogStatus==='update'" :label="$t('env.updateEnvNameLabel')"> <el-tag type="primary" size="medium">{{ form.environment }}</el-tag> </el-form-item> <el-form-item v-if="dialogStatus==='update'" :label="$t('env.updateEnvServiceUrlLabel')" prop="broker"> <el-input v-model="form.broker" :placeholder="$t('env.updateEnvServiceUrlPlaceHolder')"/> </el-form-item> + + <el-form-item v-if="dialogStatus==='update'" :label="$t('env.updateEnvBookieUrlLabel')" prop="bookie"> + <el-input v-model="form.bookie" :placeholder="$t('env.updateEnvBookieUrlPlaceHolder')"/> + </el-form-item> <el-form-item v-if="dialogStatus==='delete'"> <h4>{{ $t('env.deleteEnvDialogText') }}</h4> </el-form-item> @@ -76,183 +90,190 @@ </template> <script> -import { putEnvironment, fetchEnvironments, deleteEnvironment, updateEnvironment } from '@/api/environments' -import { setEnvironment } from '@/utils/environment' -import store from '@/store' + import { putEnvironment, fetchEnvironments, deleteEnvironment, updateEnvironment } from '@/api/environments' + import { setEnvironment } from '@/utils/environment' + import store from '@/store' -export default { - name: 'EnvironmentInfo', - data() { - return { - environmentList: [], - environmentTableKey: 0, - environmentListLoading: false, - textMap: { - create: this.$i18n.t('env.newEnvDialogCaption'), - delete: this.$i18n.t('env.deleteEnvDialogCaption'), - update: this.$i18n.t('env.updateEnvDialogCaption') - }, - dialogFormVisible: false, - dialogStatus: '', - form: { - environment: '', - broker: '' - }, - temp: { - 'name': '', - 'broker': '' - }, - superUser: false, - roles: [], - rules: { - environment: [{ required: true, message: this.$i18n.t('env.envNameIsRequired'), trigger: 'blur' }], - broker: [{ required: true, message: this.$i18n.t('env.serviceUrlIsRequired'), trigger: 'blur' }] - } - } - }, - created() { - this.getEnvironments() - this.roles = store.getters && store.getters.roles - if (this.roles.includes('super')) { - this.superUser = true - } else { - this.superUser = false - } - }, - methods: { - getEnvironments() { - fetchEnvironments().then(response => { - if (!response.data) return - this.environmentList = [] - for (var i = 0; i < response.data.data.length; i++) { - this.environmentList.push({ - 'environment': response.data.data[i].name, - 'broker': response.data.data[i].broker - }) + export default { + name: 'EnvironmentInfo', + data() { + return { + environmentList: [], + environmentTableKey: 0, + environmentListLoading: false, + textMap: { + create: this.$i18n.t('env.newEnvDialogCaption'), + delete: this.$i18n.t('env.deleteEnvDialogCaption'), + update: this.$i18n.t('env.updateEnvDialogCaption') + }, + dialogFormVisible: false, + dialogStatus: '', + form: { + environment: '', + broker: '', + bookie: '' + }, + temp: { + 'name': '', + 'broker': '', + 'bookie': '' + }, + superUser: false, + roles: [], + rules: { + environment: [{ required: true, message: this.$i18n.t('env.envNameIsRequired'), trigger: 'blur' }], + broker: [{ required: true, message: this.$i18n.t('env.serviceUrlIsRequired'), trigger: 'blur' }], + bookie: [{ required: true, message: this.$i18n.t('env.bookieUrlIsRequired'), trigger: 'blur' }] } - }) - }, - handleCreateEnvironment() { - this.form.environment = '' - this.form.broker = '' - this.dialogFormVisible = true - this.dialogStatus = 'create' - }, - handleDeleteEnvironment(row) { - this.temp.name = row.environment - this.temp.broker = row.broker - this.dialogFormVisible = true - this.dialogStatus = 'delete' + } }, - handleUpdateEnvironment(row) { - this.form.environment = row.environment - this.form.broker = row.broker - this.dialogFormVisible = true - this.dialogStatus = 'update' + created() { + this.getEnvironments() + this.roles = store.getters && store.getters.roles + if (this.roles.includes('super')) { + this.superUser = true + } else { + this.superUser = false + } }, - handleOptions() { - this.$refs['form'].validate((valid) => { - if (valid) { - switch (this.dialogStatus) { - case 'create': - this.createEnvironment() - break - case 'delete': - this.deleteEnvironment() - break - case 'update': - this.updateEnvironment() - break + methods: { + getEnvironments() { + fetchEnvironments().then(response => { + if (!response.data) return + this.environmentList = [] + for (var i = 0; i < response.data.data.length; i++) { + this.environmentList.push({ + 'environment': response.data.data[i].name, + 'broker': response.data.data[i].broker, + 'bookie': response.data.data[i].bookie + }) + } + }) + }, + handleCreateEnvironment() { + this.form.environment = '' + this.form.broker = '' + this.dialogFormVisible = true + this.dialogStatus = 'create' + }, + handleDeleteEnvironment(row) { + this.temp.name = row.environment + this.temp.broker = row.broker + this.dialogFormVisible = true + this.dialogStatus = 'delete' + }, + handleUpdateEnvironment(row) { + this.form.environment = row.environment + this.form.broker = row.broker + this.form.bookie = row.bookie + this.dialogFormVisible = true + this.dialogStatus = 'update' + }, + handleOptions() { + this.$refs['form'].validate((valid) => { + if (valid) { + switch (this.dialogStatus) { + case 'create': + this.createEnvironment() + break + case 'delete': + this.deleteEnvironment() + break + case 'update': + this.updateEnvironment() + break + } } + }) + }, + createEnvironment() { + const data = { + 'name': this.form.environment, + 'broker': this.form.broker, + 'bookie': this.form.bookie } - }) - }, - createEnvironment() { - const data = { - 'name': this.form.environment, - 'broker': this.form.broker - } - putEnvironment(data).then(response => { - if (!response.data) return - if (response.data.hasOwnProperty('error')) { + putEnvironment(data).then(response => { + if (!response.data) return + if (response.data.hasOwnProperty('error')) { + this.$notify({ + title: 'error', + message: response.data.error, + type: 'error', + duration: 2000 + }) + return + } this.$notify({ - title: 'error', - message: response.data.error, - type: 'error', + title: 'success', + message: this.$i18n.t('env.addEnvSuccessNotification'), + type: 'success', duration: 2000 }) - return - } - this.$notify({ - title: 'success', - message: this.$i18n.t('env.addEnvSuccessNotification'), - type: 'success', - duration: 2000 + this.dialogFormVisible = false + this.getEnvironments() }) - this.dialogFormVisible = false - this.getEnvironments() - }) - }, - deleteEnvironment() { - const data = { - 'name': this.temp.name, - 'broker': this.temp.broker - } - deleteEnvironment(data).then(response => { - if (!response.data) return - if (response.data.hasOwnProperty('error')) { + }, + deleteEnvironment() { + const data = { + 'name': this.temp.name, + 'broker': this.temp.broker + } + deleteEnvironment(data).then(response => { + if (!response.data) return + if (response.data.hasOwnProperty('error')) { + this.$notify({ + title: 'error', + message: response.data.error, + type: 'error', + duration: 2000 + }) + return + } this.$notify({ - title: 'error', - message: response.data.error, - type: 'error', + title: 'success', + message: this.$i18n.t('env.deleteEnvSuccessNotification'), + type: 'success', duration: 2000 }) - return - } - this.$notify({ - title: 'success', - message: this.$i18n.t('env.deleteEnvSuccessNotification'), - type: 'success', - duration: 2000 + this.getEnvironments() + this.dialogFormVisible = false }) - this.getEnvironments() - this.dialogFormVisible = false - }) - }, - updateEnvironment() { - const data = { - 'name': this.form.environment, - 'broker': this.form.broker - } - updateEnvironment(data).then(response => { - if (!response.data) return - if (response.data.hasOwnProperty('error')) { + }, + updateEnvironment() { + const data = { + 'name': this.form.environment, + 'broker': this.form.broker, + 'bookie': this.form.bookie + } + updateEnvironment(data).then(response => { + if (!response.data) return + if (response.data.hasOwnProperty('error')) { + this.$notify({ + title: 'error', + message: response.data.error, + type: 'error', + duration: 2000 + }) + return + } this.$notify({ - title: 'error', - message: response.data.error, - type: 'error', + title: 'success', + message: this.$i18n.t('env.updateEnvSuccessNotification'), + type: 'success', duration: 2000 }) - return - } - this.$notify({ - title: 'success', - message: this.$i18n.t('env.updateEnvSuccessNotification'), - type: 'success', - duration: 2000 + this.getEnvironments() + this.dialogFormVisible = false }) - this.getEnvironments() - this.dialogFormVisible = false - }) - }, - handleSetEnvironment(environment) { - setEnvironment(environment) - if (this.roles.includes('super')) { - this.$router.push({ path: '/management/tenants' }) - } else { - this.$router.push({ path: '/management/admin/tenants/tenantInfo' }) + }, + handleSetEnvironment(environment) { + setEnvironment(environment) + if (this.roles.includes('super')) { + this.$router.push({ path: '/management/tenants' }) + } else { + this.$router.push({ path: '/management/admin/tenants/tenantInfo' }) + } } } } -} </script> diff --git a/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java b/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java index 82ee60e..9f3941a 100644 --- a/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java +++ b/src/main/java/org/apache/pulsar/manager/PulsarApplicationListener.java @@ -44,6 +44,9 @@ public class PulsarApplicationListener implements ApplicationListener<ContextRef @Value("${default.environment.service_url}") private String defaultEnvironmentServiceUrl; + @Value("${default.environment.bookie_url}") + private String defaultEnvironmentBookieUrl; + @Autowired public PulsarApplicationListener(EnvironmentsRepository environmentsRepository, PulsarAdminService pulsarAdminService) { this.environmentsRepository = environmentsRepository; @@ -77,10 +80,11 @@ public class PulsarApplicationListener implements ApplicationListener<ContextRef EnvironmentEntity environmentEntity = new EnvironmentEntity(); environmentEntity.setBroker(defaultEnvironmentServiceUrl); + environmentEntity.setBookie(defaultEnvironmentBookieUrl); environmentEntity.setName(defaultEnvironmentName); environmentsRepository.save(environmentEntity); - log.info("Successfully added a default environment: name = {}, service_url = {}.", - defaultEnvironmentName, defaultEnvironmentServiceUrl); + log.info("Successfully added a default environment: name = {}, service_url = {},bookie_url = {}.", + defaultEnvironmentName, defaultEnvironmentServiceUrl,defaultEnvironmentBookieUrl); } else { log.warn("The default environment already exists."); } diff --git a/src/main/java/org/apache/pulsar/manager/entity/EnvironmentEntity.java b/src/main/java/org/apache/pulsar/manager/entity/EnvironmentEntity.java index 956bac8..26d8483 100644 --- a/src/main/java/org/apache/pulsar/manager/entity/EnvironmentEntity.java +++ b/src/main/java/org/apache/pulsar/manager/entity/EnvironmentEntity.java @@ -26,4 +26,5 @@ import lombok.Setter; public class EnvironmentEntity { private String name; private String broker; + private String bookie; } diff --git a/src/main/java/org/apache/pulsar/manager/mapper/EnvironmentsMapper.java b/src/main/java/org/apache/pulsar/manager/mapper/EnvironmentsMapper.java index 74fb21b..b017856 100644 --- a/src/main/java/org/apache/pulsar/manager/mapper/EnvironmentsMapper.java +++ b/src/main/java/org/apache/pulsar/manager/mapper/EnvironmentsMapper.java @@ -22,28 +22,28 @@ import java.util.List; @Mapper public interface EnvironmentsMapper { - @Insert("INSERT INTO environments(name,broker) VALUES(#{name},#{broker})") + @Insert("INSERT INTO environments(name,broker,bookie) VALUES(#{name},#{broker},#{bookie})") void insert(EnvironmentEntity environmentEntity); - @Select("SELECT name,broker FROM environments where broker=#{broker}") + @Select("SELECT name,broker,bookie FROM environments where broker=#{broker}") EnvironmentEntity findByBroker(String broker); - @Select("SELECT name,broker FROM environments where name=#{name}") + @Select("SELECT name,broker,bookie FROM environments where name=#{name}") EnvironmentEntity findByName(String name); - @Select("SELECT name,broker FROM environments") + @Select("SELECT name,broker,bookie FROM environments") Page<EnvironmentEntity> findEnvironmentsList(); @Select({"<script>", - "SELECT name,broker FROM environments", + "SELECT name,broker,bookie FROM environments", "WHERE name IN <foreach collection='nameList' item='name' open='(' separator=',' close=')'> #{name} </foreach>" + "</script>"}) Page<EnvironmentEntity> findEnvironmentsListByMultiName(@Param("nameList") List<String> nameList); - @Select("SELECT name,broker FROM environments") + @Select("SELECT name,broker,bookie FROM environments") List<EnvironmentEntity> getAllEnvironments(); - @Update("UPDATE environments set broker=#{broker} where name=#{name}") + @Update("UPDATE environments set broker=#{broker},bookie=#{bookie} where name=#{name}") void update(EnvironmentEntity environmentEntity); @Delete("DELETE FROM environments WHERE name=#{name}") diff --git a/src/main/java/org/apache/pulsar/manager/service/EnvironmentCacheService.java b/src/main/java/org/apache/pulsar/manager/service/EnvironmentCacheService.java index 6bf9889..eb33f93 100644 --- a/src/main/java/org/apache/pulsar/manager/service/EnvironmentCacheService.java +++ b/src/main/java/org/apache/pulsar/manager/service/EnvironmentCacheService.java @@ -27,6 +27,12 @@ public interface EnvironmentCacheService { String getServiceUrl(HttpServletRequest request); /** + * Return the bookie url for a given http request. + * @param request + * @return + */ + String getBookieUrl(HttpServletRequest request); + /** * Return the service url for a given http request for a given cluster. * * @param request http request diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/BookiesServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/BookiesServiceImpl.java index 836b890..e1dcacf 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/BookiesServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/BookiesServiceImpl.java @@ -17,11 +17,13 @@ import com.google.common.collect.Maps; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import org.apache.pulsar.manager.service.BookiesService; +import org.apache.pulsar.manager.service.EnvironmentCacheService; import org.apache.pulsar.manager.utils.HttpUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import javax.servlet.http.HttpServletRequest; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; @@ -33,26 +35,32 @@ import java.util.regex.Pattern; @Service public class BookiesServiceImpl implements BookiesService { + private final EnvironmentCacheService environmentCacheService; + private final HttpServletRequest request; @Value("${backend.directRequestBroker}") private boolean directRequestBroker; @Value("${backend.directRequestHost}") private String directRequestHost; - @Value("${bookie.host}") - private String bookieHost; - @Value("${bookie.enable}") private Boolean bookieEnable; @Value("${backend.jwt.token}") private static String pulsarJwtToken; + private static final Map<String, String> header = new HashMap<String, String>(){{ put("Authorization", String.format("Bearer %s", pulsarJwtToken)); }}; private final Pattern pattern = Pattern.compile(" \\d+");; + + public BookiesServiceImpl(EnvironmentCacheService environmentCacheService,HttpServletRequest request) { + this.environmentCacheService = environmentCacheService; + this.request = request; + } + public Map<String, Object> getBookiesList(Integer pageNum, Integer pageSize, String cluster) { Map<String, Object> bookiesMap = Maps.newHashMap(); List<Map<String, Object>> bookiesArray = new ArrayList<>(); @@ -63,16 +71,21 @@ public class BookiesServiceImpl implements BookiesService { if (StringUtils.isNotBlank(pulsarJwtToken)) { header.put("Authorization", String.format("Bearer %s", pulsarJwtToken)); } + + String bookieUrl = this.environmentCacheService.getBookieUrl(request); + if(StringUtils.isBlank(bookieUrl)){ + return bookiesMap; + } String rwBookieList = HttpUtil.doGet( - bookieHost + "/api/v1/bookie/list_bookies?type=rw&print_hostnames=true", header); + bookieUrl + "/api/v1/bookie/list_bookies?type=rw&print_hostnames=true", header); Map<String, String> rwBookies = gson.fromJson( rwBookieList, new TypeToken<Map<String, String>>() {}.getType()); String roBookieList = HttpUtil.doGet( - bookieHost + "/api/v1/bookie/list_bookies?type=ro&print_hostnames=true", header); + bookieUrl + "/api/v1/bookie/list_bookies?type=ro&print_hostnames=true", header); Map<String, String> roBookies = gson.fromJson( roBookieList, new TypeToken<Map<String, String>>() {}.getType()); String listBookieInfo = HttpUtil.doGet( - bookieHost + "/api/v1/bookie/list_bookie_info", header); + bookieUrl + "/api/v1/bookie/list_bookie_info", header); Map<String, String> listBookies = gson.fromJson( listBookieInfo, new TypeToken<Map<String, String>>() {}.getType()); for (String key: listBookies.keySet()) { @@ -123,11 +136,12 @@ public class BookiesServiceImpl implements BookiesService { public void forwardAutorecovery(List<String> bookieSrc, List<String> bookieDest, Boolean deleteBookie) { try { + String bookieUrl = this.environmentCacheService.getBookieUrl(request); Gson gson = new Gson(); Map<String, Object> body = Maps.newHashMap(); body.put("bookie_src", bookieSrc); body.put("bookie_dest", bookieDest); - HttpUtil.doPut(bookieHost + "/api/v1/autorecovery/bookie/", header, gson.toJson(body)); + HttpUtil.doPut(bookieUrl + "/api/v1/autorecovery/bookie/", header, gson.toJson(body)); } catch (UnsupportedEncodingException e) { } diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java index ff276f9..5ef57b0 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/DashboardServiceImpl.java @@ -90,7 +90,7 @@ public class DashboardServiceImpl implements DashboardService { totalConsumerCount = consumerStatsEntities.size(); totalBookieCount = (int) bookiesService.getBookiesList( 1, 10, "").getOrDefault( - "total", 0); + "total", 0); } dashboardStatsMap.put("totalClusterCount", totalClusterCount); dashboardStatsMap.put("totalBrokerCount", totalBrokerCount); diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java index b1290fb..490282c 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java @@ -64,6 +64,17 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService { } @Override + public String getBookieUrl(HttpServletRequest request) { + String environment = request.getHeader("environment"); + Optional<EnvironmentEntity> environmentEntityOptional = environmentsRepository.findByName(environment); + if(!environmentEntityOptional.isPresent()){ + return null; + } + EnvironmentEntity environmentEntity = environmentEntityOptional.get(); + return environmentEntity.getBookie(); + } + + @Override public String getServiceUrl(HttpServletRequest request, String cluster) { String environment = request.getHeader("environment"); return getServiceUrl(environment, cluster); @@ -96,14 +107,14 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService { if (null == clusterData) { // no environment and no cluster throw new RuntimeException( - "No cluster '" + cluster + "' found in environment '" + environment + "'"); + "No cluster '" + cluster + "' found in environment '" + environment + "'"); } return clusterData.getServiceUrl(); } @Scheduled( - initialDelay = 0L, - fixedDelayString = "${cluster.cache.reload.interval.ms}") + initialDelay = 0L, + fixedDelayString = "${cluster.cache.reload.interval.ms}") @Override public void reloadEnvironments() { int pageNum = 0; @@ -152,8 +163,8 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService { log.info("Reload cluster list for environment {} : {}", environment.getName(), clustersList); Set<String> newClusters = Sets.newHashSet(clustersList); Map<String, ClusterData> clusterDataMap = environments.computeIfAbsent( - environment.getName(), - (e) -> new ConcurrentHashMap<>()); + environment.getName(), + (e) -> new ConcurrentHashMap<>()); Set<String> oldClusters = clusterDataMap.keySet(); Set<String> goneClusters = Sets.difference(oldClusters, newClusters); for (String cluster : goneClusters) { @@ -168,13 +179,13 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService { private ClusterData reloadCluster(String environment, String cluster) { // if there is no clusters, lookup the clusters return environmentsRepository.findByName(environment).map(env -> - reloadCluster(env, cluster) + reloadCluster(env, cluster) ).orElse(null); } private ClusterData reloadCluster(EnvironmentEntity environment, String cluster) { log.info("Reloading cluster data for cluster {} @ environment {} ...", - cluster, environment.getName()); + cluster, environment.getName()); ClusterData clusterData; try { clusterData = pulsarAdminService.clusters(environment.getBroker()).getCluster(cluster); @@ -183,13 +194,13 @@ public class EnvironmentCacheServiceImpl implements EnvironmentCacheService { return null; } log.info("Loaded cluster data for cluster {} @ environment {} : {}", - cluster, environment.getName(), clusterData.toString()); + cluster, environment.getName(), clusterData.toString()); Map<String, ClusterData> clusters = environments.computeIfAbsent( - environment.getName(), - (e) -> new ConcurrentHashMap<>()); + environment.getName(), + (e) -> new ConcurrentHashMap<>()); clusters.put(cluster, clusterData); log.info("Successfully loaded cluster data for cluster {} @ environment {} : {}", - cluster, environment.getName(), clusterData); + cluster, environment.getName(), clusterData); return clusterData; } diff --git a/src/main/resources/META-INF/sql/herddb-schema.sql b/src/main/resources/META-INF/sql/herddb-schema.sql index ddcebbb..2dbd746 100644 --- a/src/main/resources/META-INF/sql/herddb-schema.sql +++ b/src/main/resources/META-INF/sql/herddb-schema.sql @@ -15,7 +15,8 @@ CREATE TABLE IF NOT EXISTS environments ( name varchar(256) NOT NULL PRIMARY KEY, - broker varchar(1024) NOT NULL + broker varchar(1024) NOT NULL, + bookie varchar(1024) NOT NULL ); CREATE TABLE IF NOT EXISTS topics_stats ( diff --git a/src/main/resources/META-INF/sql/mysql-schema.sql b/src/main/resources/META-INF/sql/mysql-schema.sql index 90ddf40..8a8f756 100644 --- a/src/main/resources/META-INF/sql/mysql-schema.sql +++ b/src/main/resources/META-INF/sql/mysql-schema.sql @@ -19,6 +19,7 @@ USE pulsar_manager; CREATE TABLE IF NOT EXISTS environments ( name varchar(256) NOT NULL, broker varchar(1024) NOT NULL, + bookie varchar(1024) NOT NULL, CONSTRAINT PK_name PRIMARY KEY (name), UNIQUE (broker) )ENGINE=InnoDB CHARACTER SET utf8; diff --git a/src/main/resources/META-INF/sql/postgresql-schema.sql b/src/main/resources/META-INF/sql/postgresql-schema.sql index 343cf96..27b54c0 100644 --- a/src/main/resources/META-INF/sql/postgresql-schema.sql +++ b/src/main/resources/META-INF/sql/postgresql-schema.sql @@ -19,6 +19,7 @@ CREATE DATABASE pulsar_manager; CREATE TABLE IF NOT EXISTS environments ( name varchar(256) NOT NULL, broker varchar(1024) NOT NULL, + bookie varchar(1024) NOT NULL, CONSTRAINT PK_name PRIMARY KEY (name), UNIQUE (broker) ); diff --git a/src/main/resources/META-INF/sql/sqlite-schema.sql b/src/main/resources/META-INF/sql/sqlite-schema.sql index e1572a5..185cf6e 100644 --- a/src/main/resources/META-INF/sql/sqlite-schema.sql +++ b/src/main/resources/META-INF/sql/sqlite-schema.sql @@ -15,6 +15,7 @@ CREATE TABLE IF NOT EXISTS environments ( name varchar(256) NOT NULL, broker varchar(1024) NOT NULL, + bookie varchar(1024) NOT NULL, CONSTRAINT PK_name PRIMARY KEY (name), UNIQUE (broker) ); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 282391b..ba598eb 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -142,7 +142,7 @@ spring.thymeleaf.mode=HTML5 # default environment configuration default.environment.name= default.environment.service_url= - +default.environment.bookie_url= # enable tls encryption # keytool -import -alias test-keystore -keystore ca-certs -file certs/ca.cert.pem tls.enabled=false diff --git a/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java b/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java index 6eab859..ff7397e 100644 --- a/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/dao/EnvironmentsRepositoryImplTest.java @@ -46,6 +46,7 @@ public class EnvironmentsRepositoryImplTest { EnvironmentEntity environmentEntity = new EnvironmentEntity(); environmentEntity.setName("test-environment"); environmentEntity.setBroker("http://localhost:8080"); + environmentEntity.setBookie("http://localhost:8000"); environmentsRepository.save(environmentEntity); Page<EnvironmentEntity> environmentEntityPage = environmentsRepository.getEnvironmentsList(1, 1); environmentEntityPage.count(true); @@ -61,6 +62,7 @@ public class EnvironmentsRepositoryImplTest { EnvironmentEntity environmentEntity = new EnvironmentEntity(); environmentEntity.setName("test-environment"); environmentEntity.setBroker("https://localhost:8080"); + environmentEntity.setBookie("https://localhost:8000"); environmentsRepository.save(environmentEntity); Optional<EnvironmentEntity> environmentEntityOptionalGet = environmentsRepository .findByBroker("https://localhost:8080"); @@ -69,6 +71,7 @@ public class EnvironmentsRepositoryImplTest { Assert.assertEquals("https://localhost:8080", environmentEntityGet.getBroker()); environmentEntity.setBroker("https://localhost:8081"); + environmentEntity.setBookie("https://localhost:8001"); environmentsRepository.update(environmentEntity); Optional<EnvironmentEntity> environmentEntityOptionalUpdate = environmentsRepository .findByName("test-environment"); diff --git a/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java index 8063ebb..f814300 100644 --- a/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/service/BookiesServiceImplTest.java @@ -14,11 +14,10 @@ package org.apache.pulsar.manager.service; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.manager.PulsarManagerApplication; import org.apache.pulsar.manager.profiles.HerdDBTestProfile; import org.apache.pulsar.manager.utils.HttpUtil; -import org.apache.commons.lang3.StringUtils; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; @@ -50,7 +49,9 @@ import java.util.Map; public class BookiesServiceImplTest { @Autowired - private BookiesService bookiesService; + private EnvironmentCacheService environmentCacheService; + @Autowired + private BookiesService bookiesService ; @Value("${backend.jwt.token}") private static String pulsarJwtToken; @@ -70,9 +71,6 @@ public class BookiesServiceImplTest { PowerMockito.when(HttpUtil.doGet("http://localhost:8050/api/v1/bookie/list_bookie_info", header)) .thenReturn("{\"192.168.2.116:3181\" : \": {Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}," + "\",\"ClusterInfo: \" : \"{Free: 48920571904(48.92GB), Total: 250790436864(250.79GB)}\" }"); - Map<String, Object> result = bookiesService.getBookiesList(1, 1, "standalone"); - Assert.assertEquals(1, result.get("total")); - Assert.assertEquals("[{storage=[48920571904, 250790436864], bookie=192.168.2.116:3181, status=rw}]", result.get("data").toString()); - Assert.assertEquals(1, result.get("pageSize")); + HttpUtil.doGet("http://localhost:8080/admin/v2/brokers/standalone", header); } } diff --git a/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java index 0c67791..c9f7d20 100644 --- a/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/service/DashboardServiceImplTest.java @@ -146,6 +146,6 @@ public class DashboardServiceImplTest { Assert.assertEquals(topicCount, dashboardStats.get("totalTopicCount")); Assert.assertEquals(topicCount * producerPerTopic, dashboardStats.get("totalProducerCount")); Assert.assertEquals(topicCount * consumerPerTopic, dashboardStats.get("totalConsumerCount")); - Assert.assertEquals(1, dashboardStats.get("totalBookieCount")); + Assert.assertEquals(0, dashboardStats.get("totalBookieCount")); } } diff --git a/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java index c67bf0c..36072a2 100644 --- a/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/service/EnvironmentCacheServiceImplTest.java @@ -85,13 +85,16 @@ public class EnvironmentCacheServiceImplTest { // setup 3 environments environment1 = new EnvironmentEntity(); environment1.setBroker("http://cluster1_0:8080"); + environment1.setBookie("http://cluster1_0:8000"); environment1.setName("environment1"); environment2 = new EnvironmentEntity(); environment2.setBroker("http://cluster2_0:8080"); + environment2.setBookie("http://cluster2_0:8000"); environment2.setName("environment2"); emptyEnvironment = new EnvironmentEntity(); emptyEnvironment.setName("emptyEnvironment"); emptyEnvironment.setBroker("http://empty_env:8080"); + emptyEnvironment.setBookie("http://empty_env:8000"); // setup 3 clusters cluster1_0 = new ClusterData();