This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new e6c1673 [TUBEMQ-536] broker - register special topic to master (#408)
e6c1673 is described below
commit e6c16731b0ac4487cae9af1a247ea4537696abe3
Author: EMsnap <[email protected]>
AuthorDate: Wed Feb 24 17:36:21 2021 +0800
[TUBEMQ-536] broker - register special topic to master (#408)
* [TUBEMQ-536] broker - register special topic to master
* [TUBEMQ-536] broker - register special topic to master
---
.../org/apache/tubemq/corebase/TBaseConstants.java | 2 ++
.../bdbstore/bdbentitys/BdbBrokerConfEntity.java | 8 ++++++
.../web/handler/WebBrokerDefConfHandler.java | 30 +++++++++++++++++++++-
3 files changed, 39 insertions(+), 1 deletion(-)
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
index 5238323..7c515a6 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TBaseConstants.java
@@ -64,6 +64,8 @@ public class TBaseConstants {
public static final long CFG_DEFAULT_AUTH_TIMESTAMP_VALID_INTERVAL = 20000;
+ public static final String OFFSET_TOPIC = "offsetTopic";
+
public static final int META_MB_UNIT_SIZE = (1024 * 1024);
public static final int META_MESSAGE_SIZE_ADJUST = (512 * 1024);
public static final int META_MAX_MESSAGE_HEADER_SIZE = (10 * 1024);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index 318e2ca..9ef9c2d 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -371,6 +371,14 @@ public class BdbBrokerConfEntity implements Serializable {
return numPartitions;
}
+ public boolean getAcceptPublish() {
+ return acceptPublish;
+ }
+
+ public boolean getAcceptSubscribe() {
+ return acceptSubscribe;
+ }
+
public void setDftNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index d967af3..59d39ca 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -18,6 +18,7 @@
package org.apache.tubemq.server.master.web.handler;
import static java.lang.Math.abs;
+import static org.apache.tubemq.corebase.TBaseConstants.OFFSET_TOPIC;
import com.google.common.collect.Maps;
import java.text.SimpleDateFormat;
@@ -310,7 +311,6 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_TLS_PORT)
.append(TokenConstants.EQ).append(brokerTlsPort).toString();
- strBuffer.delete(0, strBuffer.length());
BdbBrokerConfEntity brokerConfEntity =
new BdbBrokerConfEntity(brokerId, brokerIp, brokerPort,
numPartitions, unflushThreshold, unflushInterval,
@@ -318,6 +318,10 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
acceptSubscribe, attributes, true, false,
createUser,
createDate, modifyUser, modifyDate);
brokerConfManager.confAddBrokerDefaultConfig(brokerConfEntity);
+ strBuffer.delete(0, strBuffer.length());
+
+ // for every broker configured, create an offset topic to store
offsets
+ inAddOffsetTopic(brokerConfEntity, brokerId, attributes,
strBuffer);
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
} catch (Exception e) {
strBuffer.delete(0, strBuffer.length());
@@ -327,6 +331,30 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
return strBuffer;
}
+
+ private void inAddOffsetTopic(BdbBrokerConfEntity brokerConf,
+ int brokerId, String attributes, StringBuilder topicNameStrBuilder)
throws Exception {
+
+ // reuse stringBuilder to create topic name
+ String topicName =
topicNameStrBuilder.append(OFFSET_TOPIC).append(brokerId).toString();
+
+ BdbTopicConfEntity bdbTopicConfEntity = new
BdbTopicConfEntity(brokerConf.getBrokerId(),
+ brokerConf.getBrokerIp(), brokerConf.getBrokerPort(), topicName,
+ brokerConf.getDftNumPartitions(),
brokerConf.getDftUnflushThreshold(),
+ brokerConf.getDftUnflushInterval(), brokerConf.getDftDeleteWhen(),
+ brokerConf.getDftDeletePolicy(), brokerConf.getAcceptPublish(),
+ brokerConf.getAcceptSubscribe(), brokerConf.getNumTopicStores(),
+ attributes, brokerConf.getRecordCreateUser(),
brokerConf.getRecordCreateDate(),
+ brokerConf.getRecordModifyUser(),
brokerConf.getRecordModifyDate());
+
+ boolean result =
brokerConfManager.confAddTopicConfig(bdbTopicConfEntity);
+ if (result) { // if it succeeds in adding topic config
+ brokerConfManager.updateBrokerConfChanged(brokerConf.getBrokerId(),
+ true, false);
+ }
+ topicNameStrBuilder.delete(0, topicNameStrBuilder.length());
+ }
+
/**
* Add default config to brokers in batch
*