This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch TUBEMQ-570
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-570 by this push:
new a36610a [TUBEMQ-579] Add structure mapping of BDB and metadata entity
classes
a36610a is described below
commit a36610a841c94f722d72f80249d37225e0cc0d66
Author: gosonzhang <[email protected]>
AuthorDate: Wed Mar 17 19:55:37 2021 +0800
[TUBEMQ-579] Add structure mapping of BDB and metadata entity classes
---
.../org/apache/tubemq/corebase/TokenConstants.java | 7 -
.../tubemq/server/common/TServerConstants.java | 2 -
.../exception/LoadMetaException.java} | 20 +--
.../{RuleStatus.java => EnableStatus.java} | 10 +-
.../master/bdbstore/DefaultBdbStoreService.java | 4 +-
.../bdbstore/bdbentitys/BdbBlackGroupEntity.java | 45 ++++++
.../bdbstore/bdbentitys/BdbBrokerConfEntity.java | 92 +++++++++++--
.../bdbentitys/BdbClusterSettingEntity.java | 73 ++++++++++
.../bdbentitys/BdbConsumeGroupSettingEntity.java | 20 +++
.../bdbentitys/BdbConsumerGroupEntity.java | 5 +
.../bdbentitys/BdbGroupFilterCondEntity.java | 77 ++++++++++-
.../bdbentitys/BdbGroupFlowCtrlEntity.java | 60 +++++++-
.../bdbentitys/BdbTopicAuthControlEntity.java | 51 +++++++
.../bdbstore/bdbentitys/BdbTopicConfEntity.java | 83 +++++++++--
.../server/master/metastore/TStoreConstants.java | 33 +++--
.../master/metastore/dao/entity/BaseEntity.java | 4 +
.../metastore/dao/entity/BrokerConfEntity.java | 96 ++++++++++---
.../metastore/dao/entity/ClusterSettingEntity.java | 115 +++++++++++-----
.../metastore/dao/entity/GroupBlackListEntity.java | 29 +++-
.../metastore/dao/entity/GroupConfigEntity.java | 152 +++++++++++++++++++++
...eCtrlEntity.java => GroupFilterCtrlEntity.java} | 81 ++++++-----
.../metastore/dao/entity/GroupFlowCtrlEntity.java | 99 --------------
.../metastore/dao/entity/TopicAuthCtrlEntity.java | 68 ---------
.../metastore/dao/entity/TopicConfEntity.java | 136 +++++++++---------
.../metastore/dao/entity/TopicCtrlEntity.java | 95 +++++++++++++
.../metastore/dao/entity/TopicPropGroup.java | 4 +-
...roupFlowCtrlMapper.java => AbstractMapper.java} | 10 +-
.../metastore/dao/mapper/BrokerConfigMapper.java | 10 +-
.../metastore/dao/mapper/ClusterConfigMapper.java | 6 +-
.../metastore/dao/mapper/GroupBlackListMapper.java | 9 +-
...terConfigMapper.java => GroupConfigMapper.java} | 11 +-
...onfigMapper.java => GroupFilterCtrlMapper.java} | 10 +-
.../metastore/dao/mapper/TopicAuthCtrlMapper.java | 31 -----
.../metastore/dao/mapper/TopicConfigMapper.java | 8 +-
...ConsumeCtrlMapper.java => TopicCtrlMapper.java} | 11 +-
.../nodemanage/nodebroker/BrokerConfManager.java | 15 +-
.../web/handler/WebAdminGroupCtrlHandler.java | 18 +--
.../web/handler/WebAdminTopicAuthHandler.java | 4 +-
.../web/handler/WebBrokerDefConfHandler.java | 29 ++--
.../web/handler/WebBrokerTopicConfHandler.java | 37 ++---
40 files changed, 1164 insertions(+), 506 deletions(-)
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
index ab2d79e..d40e022 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
@@ -29,13 +29,6 @@ public class TokenConstants {
public static final String BLANK = " ";
- public static final String TOKEN_STORE_NUM = "storeNum";
- public static final String TOKEN_QRY_PRIORITY_ID = "qryPriorityId";
- public static final String TOKEN_DATA_UNFLUSHHOLD = "unFlushDataHold";
- public static final String TOKEN_MCACHE_MSG_CNT = "memCacheMsgCntInK";
- public static final String TOKEN_TLS_PORT = "TLSPort";
- public static final String TOKEN_MCACHE_MSG_SIZE = "memCacheMsgSizeInMB";
- public static final String TOKEN_MCACHE_FLUSH_INTVL = "memCacheFlushIntvl";
public static final String TOKEN_MSG_TYPE = "$msgType$";
public static final String TOKEN_MSG_TIME = "$msgTime$";
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
index 186c9ff..09b115f 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/TServerConstants.java
@@ -25,8 +25,6 @@ public final class TServerConstants {
public static final String TOKEN_JOB_TOPICS = "topics";
public static final String TOKEN_JOB_STORE_MGR = "messageStoreManager";
public static final String TOKEN_DEFAULT_FLOW_CONTROL =
"default_master_ctrl";
- public static final String TOKEN_DEFAULT_CLUSTER_SETTING =
"default_cluster_config";
- public static final String TOKEN_MAX_MSG_SIZE = "maxMsgSize";
public static final long DEFAULT_DATA_VERSION = 0L;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/exception/LoadMetaException.java
similarity index 64%
copy from
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
copy to
tubemq-server/src/main/java/org/apache/tubemq/server/common/exception/LoadMetaException.java
index a08ab41..1fa6f7e 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/exception/LoadMetaException.java
@@ -15,17 +15,21 @@
* limitations under the License.
*/
-package org.apache.tubemq.server.master.metastore.dao.mapper;
+package org.apache.tubemq.server.common.exception;
-import java.util.Map;
+public class LoadMetaException extends RuntimeException {
-public interface GroupBlackListMapper {
+ static final long serialVersionUID = 5286701925988728790L;
- boolean loadGroupBlackListConfig(Map<String, String> metaDataMap);
-
- boolean putGroupBlackListConfig(String key, String blackListJsonData);
-
- boolean delGroupBlackListConfig(String key);
+ public LoadMetaException(String msg) {
+ super(msg);
+ }
+ public LoadMetaException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ public LoadMetaException(Throwable cause) {
+ super(cause);
+ }
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/RuleStatus.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
similarity index 83%
rename from
tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/RuleStatus.java
rename to
tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
index 29675c9..c7b86a1 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/RuleStatus.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/common/statusdef/EnableStatus.java
@@ -18,7 +18,7 @@
package org.apache.tubemq.server.common.statusdef;
-public enum RuleStatus {
+public enum EnableStatus {
STATUS_UNDEFINE(-2, "Undefined."),
STATUS_ENABLE(1, "Enable."),
STATUS_DISABLE(0, "Disable.");
@@ -27,7 +27,7 @@ public enum RuleStatus {
private String description;
- RuleStatus(int code, String description) {
+ EnableStatus(int code, String description) {
this.code = code;
this.description = description;
}
@@ -40,13 +40,13 @@ public enum RuleStatus {
return description;
}
- public static RuleStatus valueOf(int code) {
- for (RuleStatus status : RuleStatus.values()) {
+ public static EnableStatus valueOf(int code) {
+ for (EnableStatus status : EnableStatus.values()) {
if (status.getCode() == code) {
return status;
}
}
- throw new IllegalArgumentException(String.format("unknown Rule status
code %s", code));
+ throw new IllegalArgumentException(String.format("unknown Enable
status code %s", code));
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index 1b8a1b1..b4a86cd 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.Server;
-import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.fileconfig.MasterReplicationConfig;
import org.apache.tubemq.server.master.MasterConfig;
import org.apache.tubemq.server.master.TMaster;
@@ -68,6 +67,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEnt
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
import org.apache.tubemq.server.master.utils.BdbStoreSamplePrint;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.apache.tubemq.server.master.web.model.ClusterNodeVO;
@@ -811,7 +811,7 @@ public class DefaultBdbStoreService implements
BdbStoreService, Server {
@Override
public boolean delBdbClusterConfEntity() {
try {
-
clusterDefSettingIndex.delete(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+
clusterDefSettingIndex.delete(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
} catch (Throwable e) {
logger.error("[BDB Error] delBdbClusterConfEntity Error ", e);
return false;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBlackGroupEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBlackGroupEntity.java
index abf44df..d1fda6b 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBlackGroupEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBlackGroupEntity.java
@@ -22,8 +22,11 @@ import com.sleepycat.persist.model.PrimaryKey;
import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -52,6 +55,19 @@ public class BdbBlackGroupEntity implements Serializable {
this.createDate = createDate;
}
+ public BdbBlackGroupEntity(String topicName, String groupName,
+ String attributes, String createUser,
+ Date createDate) {
+ this.recordKey = new StringBuilder(512).append(topicName)
+
.append(TokenConstants.ATTR_SEP).append(consumerGroupName).toString();
+ this.topicName = topicName;
+ this.consumerGroupName = groupName;
+ this.attributes = attributes;
+ this.createUser = createUser;
+ this.createDate = createDate;
+ }
+
+
public String getAttributes() {
return attributes;
}
@@ -96,6 +112,35 @@ public class BdbBlackGroupEntity implements Serializable {
this.createDate = createDate;
}
+ public long getDataVerId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID);
+ if (atrVal != null) {
+ return Long.parseLong(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setDataVerId(long dataVerId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID,
+ String.valueOf(dataVerId));
+ }
+
+ public void setReason(String reason) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_BLK_REASON, reason);
+ }
+
+ public String getReason() {
+ return TStringUtils.getAttrValFrmAttributes(
+ this.attributes, TStoreConstants.TOKEN_BLK_REASON);
+ }
+
+
public StringBuilder toJsonString(final StringBuilder sBuilder) {
return sBuilder.append("{\"type\":\"BdbBlackGroupEntity\",")
.append("\"recordKey\":\"").append(recordKey)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index 318e2ca..bb7cee2 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -27,6 +27,7 @@ import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -259,17 +260,25 @@ public class BdbBrokerConfEntity implements Serializable {
public int getNumTopicStores() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_STORE_NUM);
+ TStoreConstants.TOKEN_STORE_NUM);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
return 1;
}
+ public BdbBrokerConfEntity setNumTopicStores(int numTopicStores) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_STORE_NUM,
+ String.valueOf(numTopicStores));
+ return this;
+ }
+
public int getDftMemCacheMsgCntInK() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_CNT);
+ TStoreConstants.TOKEN_MCACHE_MSG_CNT);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -279,14 +288,14 @@ public class BdbBrokerConfEntity implements Serializable {
public void setDftMemCacheMsgCntInK(final int memCacheMsgCntInK) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_CNT,
+ TStoreConstants.TOKEN_MCACHE_MSG_CNT,
String.valueOf(memCacheMsgCntInK));
}
public int getDftMemCacheMsgSizeInMB() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_SIZE);
+ TStoreConstants.TOKEN_MCACHE_MSG_SIZE);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -296,14 +305,14 @@ public class BdbBrokerConfEntity implements Serializable {
public void setDftMemCacheMsgSizeInMB(final int memCacheMsgSizeInMB) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_SIZE,
+ TStoreConstants.TOKEN_MCACHE_MSG_SIZE,
String.valueOf(memCacheMsgSizeInMB));
}
public int getDftMemCacheFlushIntvl() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_FLUSH_INTVL);
+ TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -313,14 +322,14 @@ public class BdbBrokerConfEntity implements Serializable {
public void setDftMemCacheFlushIntvl(final int memCacheFlushIntvl) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,
+ TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL,
String.valueOf(memCacheFlushIntvl));
}
public int getDftUnFlushDataHold() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_DATA_UNFLUSHHOLD);
+ TStoreConstants.TOKEN_DATA_UNFLUSHHOLD);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -330,14 +339,14 @@ public class BdbBrokerConfEntity implements Serializable {
public void setDftUnFlushDataHold(final int unFlushDataHold) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_DATA_UNFLUSHHOLD,
+ TStoreConstants.TOKEN_DATA_UNFLUSHHOLD,
String.valueOf(unFlushDataHold));
}
public int getBrokerTLSPort() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_TLS_PORT);
+ TStoreConstants.TOKEN_TLS_PORT);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -347,10 +356,19 @@ public class BdbBrokerConfEntity implements Serializable {
public void setBrokerTLSPort(final int brokerTLSPort) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_TLS_PORT,
+ TStoreConstants.TOKEN_TLS_PORT,
String.valueOf(brokerTLSPort));
}
+ public int getRegionId() {
+ return regionId;
+ }
+
+ public BdbBrokerConfEntity setRegionId(int regionId) {
+ this.regionId = regionId;
+ return this;
+ }
+
public String getBrokerAddress() {
return brokerAddress;
}
@@ -434,6 +452,58 @@ public class BdbBrokerConfEntity implements Serializable {
TStringUtils.setAttrValToAttributes(this.attributes, attrKey,
attrVal);
}
+ public long getDataVerId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID);
+ if (atrVal != null) {
+ return Long.parseLong(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public BdbBrokerConfEntity setDataVerId(long dataVerId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID,
+ String.valueOf(dataVerId));
+ return this;
+ }
+
+ public int getBrokerGroupId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_BROKER_GROUP_ID);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setBrokerGroupId(long brokerGroupId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_BROKER_GROUP_ID,
+ String.valueOf(brokerGroupId));
+ }
+
+ public int getBrokerWebPort() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_BROKER_WEBPORT);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setBrokerWebPort(int brokerWebPort) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_BROKER_WEBPORT,
+ String.valueOf(brokerWebPort));
+ }
+
private void buildStrInfo() {
StringBuilder sBuilder = new StringBuilder(512);
this.brokerAddress = sBuilder.append(this.brokerIp)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
index 7b9b570..037b7f1 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -23,7 +23,9 @@ import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
/*
@@ -268,6 +270,77 @@ public class BdbClusterSettingEntity implements
Serializable {
return modifyDate;
}
+ public void setDefDataPath(String dataPath) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_PATH, dataPath);
+ }
+
+ public String getDefDataPath() {
+ return TStringUtils.getAttrValFrmAttributes(
+ this.attributes, TStoreConstants.TOKEN_DATA_PATH);
+ }
+
+ public void setDefDataType(int dataType) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_TYPE,
String.valueOf(dataType));
+ }
+
+ public int getDefDataType() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_TYPE);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setEnableGloFlowCtrl(Boolean enableGloFlowCtrl) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_ENABLE_FLOW_CTRL,
+ String.valueOf(enableGloFlowCtrl));
+ }
+
+ public Boolean getEnableGloFlowCtrl() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_ENABLE_FLOW_CTRL);
+ if (atrVal != null) {
+ return Boolean.parseBoolean(atrVal);
+ }
+ return null;
+ }
+
+ public void setGloFlowCtrlCnt(int flowCtrlCnt) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_FLOW_CTRL_CNT,
String.valueOf(flowCtrlCnt));
+ }
+
+ public int getGloFlowCtrlCnt() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_FLOW_CTRL_CNT);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setGloFlowCtrlInfo(String flowCtrlInfo) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_FLOW_CTRL_INFO, flowCtrlInfo);
+ }
+
+ public String getGloFlowCtrlInfo() {
+ return TStringUtils.getAttrValFrmAttributes(
+ this.attributes, TStoreConstants.TOKEN_FLOW_CTRL_INFO);
+ }
+
/**
* Serialize field to json format
*
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumeGroupSettingEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumeGroupSettingEntity.java
index f285c12..0c53327 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumeGroupSettingEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumeGroupSettingEntity.java
@@ -22,7 +22,10 @@ import com.sleepycat.persist.model.PrimaryKey;
import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -124,6 +127,23 @@ public class BdbConsumeGroupSettingEntity implements
Serializable {
this.lastBindUsedDate = new Date();
}
+ public long getDataVerId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID);
+ if (atrVal != null) {
+ return Long.parseLong(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setDataVerId(long dataVerId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID,
+ String.valueOf(dataVerId));
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumerGroupEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumerGroupEntity.java
index 879338a..1f80a1e 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumerGroupEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbConsumerGroupEntity.java
@@ -71,6 +71,11 @@ public class BdbConsumerGroupEntity implements Serializable {
this.topicName = topicName;
}
+ public void setGroupAndTopicName(String groupName, String topicName) {
+ this.consumerGroupName = groupName;
+ this.topicName = topicName;
+ }
+
public String getConsumerGroupName() {
return consumerGroupName;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFilterCondEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFilterCondEntity.java
index cd12356..0046c1f 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFilterCondEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFilterCondEntity.java
@@ -22,8 +22,11 @@ import com.sleepycat.persist.model.PrimaryKey;
import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -44,7 +47,7 @@ public class BdbGroupFilterCondEntity implements Serializable
{
}
public BdbGroupFilterCondEntity(String topicName, String consumerGroupName,
- int controlStatus, String attributes,
+ int controlStatus, String filterCondStr,
String createUser, Date createDate) {
this.recordKey =
new StringBuilder(512)
@@ -54,17 +57,42 @@ public class BdbGroupFilterCondEntity implements
Serializable {
this.topicName = topicName;
this.consumerGroupName = consumerGroupName;
this.controlStatus = controlStatus;
- this.attributes = attributes;
+ setFilterCondStr(filterCondStr);
this.createUser = createUser;
this.createDate = createDate;
}
- public String getAttributes() {
- return attributes;
+ public BdbGroupFilterCondEntity(String topicName, String consumerGroupName,
+ int controlStatus, String filterCondStr,
+ String attributes, String createUser, Date
createDate) {
+ this.recordKey =
+ new StringBuilder(512)
+ .append(topicName)
+ .append(TokenConstants.ATTR_SEP)
+ .append(consumerGroupName).toString();
+ this.topicName = topicName;
+ this.consumerGroupName = consumerGroupName;
+ this.controlStatus = controlStatus;
+ this.createUser = createUser;
+ this.createDate = createDate;
+ this.attributes = attributes;
+ setFilterCondStr(filterCondStr);
}
- public void setAttributes(String attributes) {
- this.attributes = attributes;
+ public String getFilterCondStr() {
+ if (TStringUtils.isNotBlank(attributes)
+ && attributes.contains(TokenConstants.EQ)) {
+ return TStringUtils.getAttrValFrmAttributes(
+ this.attributes, TStoreConstants.TOKEN_FILTER_COND_STR);
+ } else {
+ return attributes;
+ }
+ }
+
+ public void setFilterCondStr(String filterCondStr) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_FILTER_COND_STR, filterCondStr);
}
public String getRecordKey() {
@@ -111,6 +139,43 @@ public class BdbGroupFilterCondEntity implements
Serializable {
this.createDate = createDate;
}
+ public String getAttributes() {
+ if (TStringUtils.isNotBlank(attributes)
+ && !attributes.contains(TokenConstants.EQ)) {
+ return attributes;
+ } else {
+ return "";
+ }
+ }
+
+ public void setAttributes(String attributes) {
+ this.attributes = attributes;
+ }
+
+ public long getDataVerId() {
+ if (TStringUtils.isNotBlank(attributes)
+ && attributes.contains(TokenConstants.EQ)) {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID);
+ if (atrVal != null) {
+ return Long.parseLong(atrVal);
+ }
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setDataVerId(long dataVerId) {
+ if (TStringUtils.isNotBlank(attributes)
+ && !attributes.contains(TokenConstants.EQ)) {
+ setFilterCondStr(attributes);
+ }
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID,
+ String.valueOf(dataVerId));
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
index 73e2cd7..3384c26 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbGroupFlowCtrlEntity.java
@@ -23,10 +23,11 @@ import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
+import org.apache.tubemq.server.common.statusdef.EnableStatus;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -103,6 +104,25 @@ public class BdbGroupFlowCtrlEntity implements
Serializable {
this.ssdTranslateId = ssdTranslateId;
}
+ //Constructor
+ public BdbGroupFlowCtrlEntity(long serialId, String groupName, String
flowCtrlInfo,
+ int statusId, int ruleCnt, int qryPriorityId,
+ String attributes, String createUser,
+ Date createDate) {
+ this.groupName = groupName;
+ this.serialId = serialId;
+ this.statusId = statusId;
+ this.flowCtrlInfo = flowCtrlInfo;
+ this.attributes = attributes;
+ this.ruleCnt = ruleCnt;
+ this.createUser = createUser;
+ this.createDate = createDate;
+ this.needSSDProc = false;
+ this.ssdTranslateId = TBaseConstants.META_VALUE_UNDEFINED;
+ this.setQryPriorityId(qryPriorityId);
+
+ }
+
public long getSsdTranslateId() {
return ssdTranslateId;
}
@@ -185,7 +205,7 @@ public class BdbGroupFlowCtrlEntity implements Serializable
{
public int getQryPriorityId() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_QRY_PRIORITY_ID);
+ TStoreConstants.TOKEN_QRY_PRIORITY_ID);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -195,10 +215,44 @@ public class BdbGroupFlowCtrlEntity implements
Serializable {
public void setQryPriorityId(int qryPriorityId) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_QRY_PRIORITY_ID,
+ TStoreConstants.TOKEN_QRY_PRIORITY_ID,
String.valueOf(qryPriorityId));
}
+ public EnableStatus getResCheckStatus() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_RES_CHECK_STATUS);
+ if (atrVal != null) {
+ return EnableStatus.valueOf(Integer.parseInt(atrVal));
+ }
+ return EnableStatus.STATUS_UNDEFINE;
+ }
+
+ public void setResCheckStatus(EnableStatus resCheckStatus) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_RES_CHECK_STATUS,
+ String.valueOf(resCheckStatus.getCode()));
+ }
+
+ public int getAllowedBrokerClientRate() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_BROKER_CLIENT_RATE);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setAllowedBrokerClientRate(int allowedBrokerClientRate) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_BROKER_CLIENT_RATE,
+ String.valueOf(allowedBrokerClientRate));
+ }
+
public void setModifyInfo(String modifyUser, Date modifyDate) {
this.createUser = modifyUser;
this.createDate = modifyDate;
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
index 5bc5fb1..97eb3ef 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicAuthControlEntity.java
@@ -22,7 +22,10 @@ import com.sleepycat.persist.model.PrimaryKey;
import java.io.Serializable;
import java.util.Date;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -52,6 +55,20 @@ public class BdbTopicAuthControlEntity implements
Serializable {
this.createDate = createDate;
}
+ public BdbTopicAuthControlEntity(String topicName, boolean
enableAuthControl,
+ String attributes, String createUser,
Date createDate) {
+ this.topicName = topicName;
+ if (enableAuthControl) {
+ this.enableAuthControl = 1;
+ } else {
+ this.enableAuthControl = 0;
+ }
+ this.attributes = attributes;
+ this.createUser = createUser;
+ this.createDate = createDate;
+ }
+
+
public String getAttributes() {
return attributes;
}
@@ -100,6 +117,40 @@ public class BdbTopicAuthControlEntity implements
Serializable {
this.createDate = createDate;
}
+ public long getDataVerId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID);
+ if (atrVal != null) {
+ return Long.parseLong(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setDataVerId(long dataVerId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID,
+ String.valueOf(dataVerId));
+ }
+
+ public int getMaxMsgSize() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_MAX_MSG_SIZE);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setMaxMsgSize(int maxMsgSize) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_MAX_MSG_SIZE,
+ String.valueOf(maxMsgSize));
+ }
+
public StringBuilder toJsonString(final StringBuilder sBuilder) {
return sBuilder.append("{\"type\":\"BdbConsumerGroupEntity\",")
.append("\"topicName\":\"").append(topicName)
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
index de9f6ef..22125e1 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
@@ -27,6 +27,7 @@ import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.utils.WebParameterUtils;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
@Entity
@@ -96,6 +97,34 @@ public class BdbTopicConfEntity implements Serializable {
this.attributes = attributes;
}
+ //Constructor
+ public BdbTopicConfEntity(int brokerId, String topicName,
+ int numTopicStores, int numPartitions,
+ int unflushThreshold, int unflushInterval,
+ String deletePolicy, String attributes,
+ boolean acceptPublish, boolean acceptSubscribe,
+ String createUser, Date createDate,
+ String modifyUser, Date modifyDate) {
+ StringBuilder sBuilder = new StringBuilder(512);
+ this.recordKey = sBuilder.append(brokerId)
+ .append(TokenConstants.ATTR_SEP).append(topicName).toString();
+ sBuilder.delete(0, sBuilder.length());
+ this.brokerId = brokerId;
+ this.topicName = topicName;
+ this.numPartitions = numPartitions;
+ this.unflushThreshold = unflushThreshold;
+ this.unflushInterval = unflushInterval;
+ this.deletePolicy = deletePolicy;
+ this.acceptPublish = acceptPublish;
+ this.acceptSubscribe = acceptSubscribe;
+ this.numTopicStores = numTopicStores;
+ this.createUser = createUser;
+ this.createDate = createDate;
+ this.modifyUser = modifyUser;
+ this.modifyDate = modifyDate;
+ this.attributes = attributes;
+ }
+
public void setBrokerAndTopicInfo(int brokerId, String brokerIp,
int brokerPort, String topicName) {
StringBuilder sBuilder = new StringBuilder(512);
@@ -134,7 +163,7 @@ public class BdbTopicConfEntity implements Serializable {
public void setUnflushDataHold(final int unFlushDataHold) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_DATA_UNFLUSHHOLD,
+ TStoreConstants.TOKEN_DATA_UNFLUSHHOLD,
String.valueOf(unFlushDataHold));
}
@@ -287,7 +316,7 @@ public class BdbTopicConfEntity implements Serializable {
public int getUnflushDataHold() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_DATA_UNFLUSHHOLD);
+ TStoreConstants.TOKEN_DATA_UNFLUSHHOLD);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -297,7 +326,7 @@ public class BdbTopicConfEntity implements Serializable {
public int getMemCacheMsgCntInK() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_CNT);
+ TStoreConstants.TOKEN_MCACHE_MSG_CNT);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -307,14 +336,14 @@ public class BdbTopicConfEntity implements Serializable {
public void setMemCacheMsgCntInK(final int memCacheMsgCntInK) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_CNT,
+ TStoreConstants.TOKEN_MCACHE_MSG_CNT,
String.valueOf(memCacheMsgCntInK));
}
public int getMemCacheMsgSizeInMB() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_SIZE);
+ TStoreConstants.TOKEN_MCACHE_MSG_SIZE);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -324,14 +353,14 @@ public class BdbTopicConfEntity implements Serializable {
public void setMemCacheMsgSizeInMB(final int memCacheMsgSizeInMB) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_MSG_SIZE,
+ TStoreConstants.TOKEN_MCACHE_MSG_SIZE,
String.valueOf(memCacheMsgSizeInMB));
}
public int getMemCacheFlushIntvl() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_FLUSH_INTVL);
+ TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -341,14 +370,14 @@ public class BdbTopicConfEntity implements Serializable {
public void setMemCacheFlushIntvl(final int memCacheFlushIntvl) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,
+ TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL,
String.valueOf(memCacheFlushIntvl));
}
public int getMaxMsgSize() {
String atrVal =
TStringUtils.getAttrValFrmAttributes(this.attributes,
- TServerConstants.TOKEN_MAX_MSG_SIZE);
+ TStoreConstants.TOKEN_MAX_MSG_SIZE);
if (atrVal != null) {
return Integer.parseInt(atrVal);
}
@@ -358,10 +387,44 @@ public class BdbTopicConfEntity implements Serializable {
public void setMaxMsgSize(int maxMsgSize) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes,
- TServerConstants.TOKEN_MAX_MSG_SIZE,
+ TStoreConstants.TOKEN_MAX_MSG_SIZE,
String.valueOf(maxMsgSize));
}
+ public long getDataVerId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID);
+ if (atrVal != null) {
+ return Long.parseLong(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setDataVerId(long dataVerId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_DATA_VERSION_ID,
+ String.valueOf(dataVerId));
+ }
+
+ public int getTopicId() {
+ String atrVal =
+ TStringUtils.getAttrValFrmAttributes(this.attributes,
+ TStoreConstants.TOKEN_TOPICNAME_ID);
+ if (atrVal != null) {
+ return Integer.parseInt(atrVal);
+ }
+ return TBaseConstants.META_VALUE_UNDEFINED;
+ }
+
+ public void setTopicId(int topicId) {
+ this.attributes =
+ TStringUtils.setAttrValToAttributes(this.attributes,
+ TStoreConstants.TOKEN_TOPICNAME_ID,
+ String.valueOf(topicId));
+ }
+
public void appendAttributes(String attrKey, String attrVal) {
this.attributes =
TStringUtils.setAttrValToAttributes(this.attributes, attrKey,
attrVal);
diff --git
a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/TStoreConstants.java
similarity index 53%
copy from
tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
copy to
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/TStoreConstants.java
index ab2d79e..edfb9a7 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/TokenConstants.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/TStoreConstants.java
@@ -15,28 +15,33 @@
* limitations under the License.
*/
-package org.apache.tubemq.corebase;
+package org.apache.tubemq.server.master.metastore;
-public class TokenConstants {
- public static final String SEGMENT_SEP = "#";
- public static final String ATTR_SEP = ":";
- public static final String GROUP_SEP = "@";
- public static final String LOG_SEG_SEP = ";";
- public static final String ARRAY_SEP = ",";
- public static final String EQ = "=";
- public static final String HYPHEN = "-";
- public static final String BLANK = " ";
+public final class TStoreConstants {
+ public static final String TOKEN_DEFAULT_CLUSTER_SETTING =
"default_cluster_config";
+
+ public static final String TOKEN_DATA_VERSION_ID = "dataVerId";
+ public static final String TOKEN_DATA_TYPE = "dataType";
+ public static final String TOKEN_DATA_PATH = "dataPath";
+ public static final String TOKEN_MAX_MSG_SIZE = "maxMsgSize";
+ public static final String TOKEN_ENABLE_FLOW_CTRL = "enbFlowCtrl";
+ public static final String TOKEN_FLOW_CTRL_CNT = "flowCtrlCnt";
+ public static final String TOKEN_FLOW_CTRL_INFO = "flowCtrlInfo";
+ public static final String TOKEN_BROKER_GROUP_ID = "bGroupId";
+ public static final String TOKEN_TLS_PORT = "TLSPort";
+ public static final String TOKEN_BROKER_WEBPORT = "bWebPort";
+ public static final String TOKEN_BLK_REASON = "reason";
+ public static final String TOKEN_FILTER_COND_STR = "filterCondStr";
+ public static final String TOKEN_RES_CHECK_STATUS = "resChkStatusId";
+ public static final String TOKEN_BROKER_CLIENT_RATE =
"resBrokerClientRate";
+ public static final String TOKEN_TOPICNAME_ID = "topicId";
public static final String TOKEN_STORE_NUM = "storeNum";
public static final String TOKEN_QRY_PRIORITY_ID = "qryPriorityId";
public static final String TOKEN_DATA_UNFLUSHHOLD = "unFlushDataHold";
public static final String TOKEN_MCACHE_MSG_CNT = "memCacheMsgCntInK";
- public static final String TOKEN_TLS_PORT = "TLSPort";
public static final String TOKEN_MCACHE_MSG_SIZE = "memCacheMsgSizeInMB";
public static final String TOKEN_MCACHE_FLUSH_INTVL = "memCacheFlushIntvl";
- public static final String TOKEN_MSG_TYPE = "$msgType$";
- public static final String TOKEN_MSG_TIME = "$msgTime$";
-
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
index 666e2c1..2ffebe4 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BaseEntity.java
@@ -82,6 +82,10 @@ public class BaseEntity implements Serializable {
this.modifyDate = modifyDate;
}
+ public void setDataVersionId() {
+ setDataVersionId(System.currentTimeMillis());
+ }
+
public void setDataVersionId(long dataVersionId) {
this.dataVersionId = dataVersionId;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
index e6601fc..3a0feb0 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/BrokerConfEntity.java
@@ -21,6 +21,7 @@ import java.util.Date;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.server.common.statusdef.ManageStatus;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
@@ -30,22 +31,25 @@ import
org.apache.tubemq.server.common.statusdef.ManageStatus;
*/
public class BrokerConfEntity extends BaseEntity {
+ // Primary Key
private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
private String brokerIp = "";
private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
- //broker tls port
+ // broker tls port
private int brokerTLSPort = TBaseConstants.META_VALUE_UNDEFINED;
+ // broker web port
+ private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
+ private ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
+ private boolean isConfDataUpdated = false; //conf data update flag
+ private boolean isBrokerLoaded = false; //broker conf load flag
+ private int regionId = TBaseConstants.META_VALUE_UNDEFINED;
+ private int groupId = TBaseConstants.META_VALUE_UNDEFINED;
+ private TopicPropGroup topicProps = null;
private String brokerAddress = ""; // broker ip:port
private String brokerFullInfo = ""; // broker brokerId:ip:port
private String brokerSimpleInfo = ""; // broker brokerId:ip:
private String brokerTLSSimpleInfo = ""; //tls simple info
private String brokerTLSFullInfo = ""; //tls full info
- private int regionId = TBaseConstants.META_VALUE_UNDEFINED;
- private int groupId = TBaseConstants.META_VALUE_UNDEFINED;
- private ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
- private boolean isConfDataUpdated = false; //conf data update flag
- private boolean isBrokerLoaded = false; //broker conf load flag
- private TopicPropGroup defTopicPropGroup = null;
public BrokerConfEntity() {
@@ -54,18 +58,66 @@ public class BrokerConfEntity extends BaseEntity {
public BrokerConfEntity(int brokerId, String brokerIp, int brokerPort,
- int brokerTLSPort, ManageStatus manageStatus,
+ int brokerTLSPort, int brokerWebPort, ManageStatus
manageStatus,
+ int regionId, int groupId, TopicPropGroup
defTopicProps,
boolean isConfDataUpdated, boolean isBrokerLoaded,
- TopicPropGroup defTopicPropGroup,
- String createUser, Date createDate,
- String modifyUser, Date modifyDate) {
- super(createUser, createDate, modifyUser, modifyDate);
+ long dataVersionId, String createUser,
+ Date createDate, String modifyUser, Date
modifyDate) {
+ super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
setBrokerIpAndAllPort(brokerId, brokerIp, brokerPort, brokerTLSPort);
+ this.regionId = regionId;
+ this.groupId = groupId;
+ this.brokerWebPort = brokerWebPort;
+ this.topicProps = defTopicProps;
this.manageStatus = manageStatus;
this.isConfDataUpdated = isConfDataUpdated;
this.isBrokerLoaded = isBrokerLoaded;
- this.defTopicPropGroup = defTopicPropGroup;
- this.buildStrInfo();
+ }
+
+ public BrokerConfEntity(BdbBrokerConfEntity bdbEntity) {
+ super(bdbEntity.getDataVerId(), bdbEntity.getRecordCreateUser(),
+ bdbEntity.getRecordCreateDate(),
bdbEntity.getRecordModifyUser(),
+ bdbEntity.getRecordModifyDate());
+ setBrokerIpAndAllPort(bdbEntity.getBrokerId(), bdbEntity.getBrokerIp(),
+ bdbEntity.getBrokerPort(), bdbEntity.getBrokerTLSPort());
+ this.regionId = bdbEntity.getRegionId();
+ this.groupId = bdbEntity.getBrokerGroupId();
+ this.brokerWebPort = bdbEntity.getBrokerWebPort();
+ this.topicProps =
+ new TopicPropGroup(bdbEntity.getNumTopicStores(),
bdbEntity.getDftNumPartitions(),
+ bdbEntity.getDftUnflushThreshold(),
bdbEntity.getDftUnflushInterval(),
+ bdbEntity.getDftUnFlushDataHold(),
bdbEntity.getDftMemCacheMsgSizeInMB(),
+ bdbEntity.getDftMemCacheMsgCntInK(),
bdbEntity.getDftMemCacheFlushIntvl(),
+ bdbEntity.isAcceptPublish(),
bdbEntity.isAcceptSubscribe(),
+ bdbEntity.getDftDeletePolicy(),
bdbEntity.getDataStoreType(),
+ bdbEntity.getDataPath());
+ this.manageStatus = ManageStatus.valueOf(bdbEntity.getManageStatus());
+ this.isConfDataUpdated = bdbEntity.isConfDataUpdated();
+ this.isBrokerLoaded = bdbEntity.isBrokerLoaded();
+ setAttributes(bdbEntity.getAttributes());
+ }
+
+ // build bdb object from current info
+ public BdbBrokerConfEntity buildBdbBrokerConfEntity() {
+ BdbBrokerConfEntity bdbEntity = new BdbBrokerConfEntity(brokerId,
brokerIp, brokerPort,
+ topicProps.getNumPartitions(),
topicProps.getUnflushThreshold(),
+ topicProps.getUnflushInterval(), "",
topicProps.getDeletePolicy(),
+ manageStatus.getCode(), topicProps.isAcceptPublish(),
+ topicProps.isAcceptSubscribe(), getAttributes(),
isConfDataUpdated,
+ isBrokerLoaded, getCreateUser(), getCreateDate(),
+ getModifyUser(), getModifyDate());
+ bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setRegionId(regionId);
+ bdbEntity.setBrokerGroupId(groupId);
+ bdbEntity.setBrokerTLSPort(brokerTLSPort);
+ bdbEntity.setBrokerWebPort(brokerWebPort);
+ bdbEntity.setNumTopicStores(topicProps.getNumTopicStores());
+
bdbEntity.setDftMemCacheMsgSizeInMB(topicProps.getMemCacheMsgSizeInMB());
+ bdbEntity.setDftMemCacheMsgCntInK(topicProps.getMemCacheMsgCntInK());
+ bdbEntity.setDftMemCacheFlushIntvl(topicProps.getMemCacheFlushIntvl());
+ bdbEntity.setDftUnFlushDataHold(topicProps.getUnflushDataHold());
+ bdbEntity.setDataStore(topicProps.getDataStoreType(),
topicProps.getDataPath());
+ return bdbEntity;
}
public int getBrokerId() {
@@ -129,6 +181,14 @@ public class BrokerConfEntity extends BaseEntity {
this.buildStrInfo();
}
+ public int getBrokerWebPort() {
+ return brokerWebPort;
+ }
+
+ public void setBrokerWebPort(int brokerWebPort) {
+ this.brokerWebPort = brokerWebPort;
+ }
+
public int getBrokerTLSPort() {
return brokerTLSPort;
}
@@ -173,12 +233,12 @@ public class BrokerConfEntity extends BaseEntity {
this.groupId = groupId;
}
- public TopicPropGroup getDefTopicPropGroup() {
- return defTopicPropGroup;
+ public TopicPropGroup getTopicProps() {
+ return topicProps;
}
- public void setDefTopicPropGroup(TopicPropGroup defTopicPropGroup) {
- this.defTopicPropGroup = defTopicPropGroup;
+ public void setTopicProps(TopicPropGroup topicProps) {
+ this.topicProps = topicProps;
}
private void buildStrInfo() {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
index f75b3b2..3f54faa 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/ClusterSettingEntity.java
@@ -18,9 +18,9 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.apache.tubemq.server.common.statusdef.RuleStatus;
-
+import org.apache.tubemq.server.common.statusdef.EnableStatus;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbClusterSettingEntity;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
/*
@@ -30,24 +30,82 @@ import org.apache.tubemq.server.common.statusdef.RuleStatus;
public class ClusterSettingEntity extends BaseEntity {
private String recordKey =
- TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING;
- //broker tcp port
+ TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING;
+ // broker tcp port
private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED;
- //broker tls port
+ // broker tls port
private int brokerTLSPort = TBaseConstants.META_VALUE_UNDEFINED;
- //broker web port
+ // broker web port
private int brokerWebPort = TBaseConstants.META_VALUE_UNDEFINED;
- private TopicPropGroup clsDefTopicPropGroup = new TopicPropGroup();
+ private TopicPropGroup clsDefTopicProps = new TopicPropGroup();
private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
- private RuleStatus flowCtrlStatus = RuleStatus.STATUS_UNDEFINE;
- private int flowCtrlRuleCnt = 0; //flow control rule count
- private String flowCtrlRuleInfo = ""; // flow control info
+ private EnableStatus gloFlowCtrlStatus = EnableStatus.STATUS_UNDEFINE;
+ // flow control rule count
+ private int gloFlowCtrlRuleCnt = TBaseConstants.META_VALUE_UNDEFINED;
+ // flow control info
+ private String gloFlowCtrlRuleInfo = "";
+
+
public ClusterSettingEntity() {
super();
}
+ // Constructor by BdbClusterSettingEntity
+ public ClusterSettingEntity(BdbClusterSettingEntity bdbEntity) {
+ super(bdbEntity.getConfigId(), bdbEntity.getModifyUser(),
bdbEntity.getModifyDate());
+ this.brokerPort = bdbEntity.getBrokerPort();
+ this.brokerTLSPort = bdbEntity.getBrokerTLSPort();
+ this.brokerWebPort = bdbEntity.getBrokerWebPort();
+ this.clsDefTopicProps =
+ new TopicPropGroup(bdbEntity.getNumTopicStores(),
bdbEntity.getNumPartitions(),
+ bdbEntity.getUnflushThreshold(),
bdbEntity.getUnflushInterval(),
+ bdbEntity.getUnflushDataHold(),
bdbEntity.getMemCacheMsgSizeInMB(),
+ bdbEntity.getMemCacheMsgCntInK(),
bdbEntity.getMemCacheFlushIntvl(),
+ bdbEntity.isAcceptPublish(),
bdbEntity.isAcceptSubscribe(),
+ bdbEntity.getDeletePolicy(),
bdbEntity.getDefDataType(),
+ bdbEntity.getDefDataPath());
+ this.maxMsgSizeInB = bdbEntity.getMaxMsgSizeInB();
+ this.qryPriorityId = bdbEntity.getQryPriorityId();
+ setGloFlowCtrlInfo(bdbEntity.getEnableGloFlowCtrl(),
+ bdbEntity.getGloFlowCtrlCnt(), bdbEntity.getGloFlowCtrlInfo());
+ setAttributes(bdbEntity.getAttributes());
+ }
+
+ // build bdb object from current info
+ public BdbClusterSettingEntity buildBdbClsDefSettingEntity() {
+ BdbClusterSettingEntity bdbEntity =
+ new BdbClusterSettingEntity(recordKey, getDataVersionId(),
brokerPort,
+ brokerTLSPort, brokerWebPort,
clsDefTopicProps.getNumTopicStores(),
+ clsDefTopicProps.getNumPartitions(),
clsDefTopicProps.getUnflushThreshold(),
+ clsDefTopicProps.getUnflushInterval(),
clsDefTopicProps.getUnflushDataHold(),
+ clsDefTopicProps.getMemCacheMsgCntInK(),
clsDefTopicProps.getMemCacheFlushIntvl(),
+ clsDefTopicProps.getMemCacheMsgSizeInMB(),
clsDefTopicProps.isAcceptPublish(),
+ clsDefTopicProps.isAcceptSubscribe(),
clsDefTopicProps.getDeletePolicy(),
+ this.qryPriorityId, this.maxMsgSizeInB,
getAttributes(),
+ getModifyUser(), getModifyDate());
+ bdbEntity.setDefDataPath(clsDefTopicProps.getDataPath());
+ bdbEntity.setDefDataType(clsDefTopicProps.getDataStoreType());
+ bdbEntity.setEnableGloFlowCtrl(enableFlowCtrl());
+ bdbEntity.setGloFlowCtrlCnt(gloFlowCtrlRuleCnt);
+ bdbEntity.setGloFlowCtrlInfo(gloFlowCtrlRuleInfo);
+ return bdbEntity;
+ }
+
+ public void setGloFlowCtrlInfo(Boolean enableFlowCtrl,
+ int flowCtrlCnt, String flowCtrlInfo) {
+ if (enableFlowCtrl != null) {
+ if (enableFlowCtrl) {
+ this.gloFlowCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.gloFlowCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ }
+ this.gloFlowCtrlRuleCnt = flowCtrlCnt;
+ this.gloFlowCtrlRuleInfo = flowCtrlInfo;
+ }
+
public String getRecordKey() {
return recordKey;
}
@@ -92,44 +150,33 @@ public class ClusterSettingEntity extends BaseEntity {
this.qryPriorityId = qryPriorityId;
}
- public void setFlowCtrlRuleCnt(Boolean enableFlowCtrl,
- int flowCtrlCnt, String flowCtrlInfo) {
- if (enableFlowCtrl != null) {
- if (enableFlowCtrl) {
- this.flowCtrlStatus = RuleStatus.STATUS_ENABLE;
- } else {
- this.flowCtrlStatus = RuleStatus.STATUS_DISABLE;
- }
- }
- this.flowCtrlRuleCnt = flowCtrlCnt;
- this.flowCtrlRuleInfo = flowCtrlInfo;
- }
- public int getFlowCtrlRuleCnt() {
- return flowCtrlRuleCnt;
+ public int getGloFlowCtrlRuleCnt() {
+ return gloFlowCtrlRuleCnt;
}
- public String getFlowCtrlRuleInfo() {
- return flowCtrlRuleInfo;
+ public String getGloFlowCtrlRuleInfo() {
+ return gloFlowCtrlRuleInfo;
}
public boolean enableFlowCtrl() {
- return flowCtrlStatus == RuleStatus.STATUS_ENABLE;
+ return gloFlowCtrlStatus == EnableStatus.STATUS_ENABLE;
}
public void setEnableFlowCtrl(boolean enableFlowCtrl) {
if (enableFlowCtrl) {
- this.flowCtrlStatus = RuleStatus.STATUS_ENABLE;
+ this.gloFlowCtrlStatus = EnableStatus.STATUS_ENABLE;
} else {
- this.flowCtrlStatus = RuleStatus.STATUS_DISABLE;
+ this.gloFlowCtrlStatus = EnableStatus.STATUS_DISABLE;
}
}
- public TopicPropGroup getClsDefTopicPropGroup() {
- return clsDefTopicPropGroup;
+ public TopicPropGroup getClsDefTopicProps() {
+ return clsDefTopicProps;
}
- public void setClsDefTopicPropGroup(TopicPropGroup clsDefTopicPropGroup) {
- this.clsDefTopicPropGroup = clsDefTopicPropGroup;
+ public void setClsDefTopicProps(TopicPropGroup clsDefTopicProps) {
+ this.clsDefTopicProps = clsDefTopicProps;
}
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
index 194f4ca..b0ebd8c 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupBlackListEntity.java
@@ -20,6 +20,7 @@ package org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
/*
@@ -40,12 +41,34 @@ public class GroupBlackListEntity extends BaseEntity {
public GroupBlackListEntity(String topicName, String groupName,
String reason, String createUser, Date
createDate) {
super(createUser, createDate);
+ this.setTopicAndGroup(topicName, groupName);
+ this.reason = reason;
+ }
+
+ public GroupBlackListEntity(BdbBlackGroupEntity bdbEntity) {
+ super(bdbEntity.getDataVerId(),
+ bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
+ this.setTopicAndGroup(bdbEntity.getTopicName(),
+ bdbEntity.getBlackGroupName());
+ this.reason = bdbEntity.getReason();
+ this.setAttributes(bdbEntity.getAttributes());
+ }
+
+ public BdbBlackGroupEntity buildBdbBlackListEntity() {
+ BdbBlackGroupEntity bdbEntity =
+ new BdbBlackGroupEntity(topicName, groupName,
+ getAttributes(), getCreateUser(), getCreateDate());
+ bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setReason(reason);
+ return bdbEntity;
+ }
+
+ public void setTopicAndGroup(String topicName, String groupName) {
+ this.topicName = topicName;
+ this.groupName = groupName;
this.recordKey = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
.append(topicName).append(TokenConstants.ATTR_SEP)
.append(groupName).toString();
- this.topicName = topicName;
- this.groupName = groupName;
- this.reason = reason;
}
public String getRecordKey() {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
new file mode 100644
index 0000000..e24b511
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConfigEntity.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.dao.entity;
+
+import java.util.Date;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.statusdef.EnableStatus;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
+
+/*
+ * store the group control setting
+ *
+ */
+public class GroupConfigEntity extends BaseEntity {
+ // group name
+ private String groupName = "";
+ // resource check control
+ private EnableStatus resCheckStatus = EnableStatus.STATUS_UNDEFINE;
+ private int allowedBrokerClientRate = TBaseConstants.META_VALUE_UNDEFINED;
+ // consume priority id
+ private int qryPriorityId = TBaseConstants.META_VALUE_UNDEFINED;
+ // consume group flow control info
+ private EnableStatus flowCtrlStatus = EnableStatus.STATUS_UNDEFINE;
+ private int ruleCnt = 0; // flow control rule count
+ private String flowCtrlInfo = ""; // flow control info
+
+
+ public GroupConfigEntity() {
+ super();
+ }
+
+ public GroupConfigEntity(String groupName, boolean enableFlowCtrl,
+ int qryPriorityId, int ruleCnt,
+ String flowCtrlInfo, String createUser,
+ Date createDate) {
+ super(createUser, createDate);
+ this.groupName = groupName;
+ if (enableFlowCtrl) {
+ this.flowCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.flowCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ this.qryPriorityId = qryPriorityId;
+ this.ruleCnt = ruleCnt;
+ this.flowCtrlInfo = flowCtrlInfo;
+ }
+
+ public GroupConfigEntity(BdbGroupFlowCtrlEntity bdbEntity) {
+ super(bdbEntity.getSerialId(),
+ bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
+ this.groupName = bdbEntity.getGroupName();
+ this.qryPriorityId = bdbEntity.getQryPriorityId();
+ this.ruleCnt = bdbEntity.getRuleCnt();
+ this.flowCtrlInfo = bdbEntity.getFlowCtrlInfo();
+ if (bdbEntity.getStatusId() != 0) {
+ this.flowCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.flowCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ setAttributes(bdbEntity.getAttributes());
+ }
+
+ public BdbGroupFlowCtrlEntity buildBdbGroupFlowCtrlEntity() {
+ //Constructor
+ int statusId = (this.flowCtrlStatus == EnableStatus.STATUS_ENABLE) ? 1
: 0;
+ BdbGroupFlowCtrlEntity bdbEntity =
+ new BdbGroupFlowCtrlEntity(getDataVersionId(), this.groupName,
+ this.flowCtrlInfo, statusId, this.ruleCnt,
this.qryPriorityId,
+ getAttributes(), getCreateUser(), getCreateDate());
+ bdbEntity.setResCheckStatus(resCheckStatus);
+ bdbEntity.setAllowedBrokerClientRate(allowedBrokerClientRate);
+ return bdbEntity;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public void setFlowCtrlRule(int ruleCnt, String flowCtrlInfo) {
+ this.ruleCnt = ruleCnt;
+ this.flowCtrlInfo = flowCtrlInfo;
+ }
+
+ public int getRuleCnt() {
+ return ruleCnt;
+ }
+
+ public String getFlowCtrlInfo() {
+ return flowCtrlInfo;
+ }
+
+ public void setFlowCtrlStatus(boolean enableFlowCtrl) {
+ if (enableFlowCtrl) {
+ this.flowCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.flowCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ }
+
+ public int getQryPriorityId() {
+ return qryPriorityId;
+ }
+
+ public void setQryPriorityId(int qryPriorityId) {
+ this.qryPriorityId = qryPriorityId;
+ }
+
+ public boolean isFlowCtrlEnable() {
+ return (this.flowCtrlStatus == EnableStatus.STATUS_ENABLE);
+ }
+
+ public boolean isEnableResCheck() {
+ return resCheckStatus == EnableStatus.STATUS_ENABLE;
+ }
+
+ public void setResCheckStatus(boolean enableResChk, int
allowedBrokerClientRate) {
+ if (enableResChk) {
+ this.resCheckStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.resCheckStatus = EnableStatus.STATUS_DISABLE;
+ }
+ this.allowedBrokerClientRate = allowedBrokerClientRate;
+ }
+
+ public int getAllowedBrokerClientRate() {
+ return allowedBrokerClientRate;
+ }
+
+ public void setAllowedBrokerClientRate(int allowedBrokerClientRate) {
+ this.allowedBrokerClientRate = allowedBrokerClientRate;
+ }
+
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConsumeCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
similarity index 51%
rename from
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConsumeCtrlEntity.java
rename to
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
index 975443c..59966bd 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupConsumeCtrlEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFilterCtrlEntity.java
@@ -20,35 +20,68 @@ package
org.apache.tubemq.server.master.metastore.dao.entity;
import java.util.Date;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
-import org.apache.tubemq.server.common.statusdef.RuleStatus;
+import org.apache.tubemq.server.common.statusdef.EnableStatus;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
/*
- * store the group consume control setting
+ * store the group filter consume control setting
*
*/
-public class GroupConsumeCtrlEntity extends BaseEntity {
+public class GroupFilterCtrlEntity extends BaseEntity {
private String recordKey = "";
private String topicName = "";
private String groupName = "";
- private RuleStatus resCheckStatus = RuleStatus.STATUS_UNDEFINE;
- private int allowedBrokerClientRate = TBaseConstants.META_VALUE_UNDEFINED;
- private RuleStatus filterConsumeStatus = RuleStatus.STATUS_UNDEFINE;
+ // filter consume setting
+ private EnableStatus filterConsumeStatus = EnableStatus.STATUS_UNDEFINE;
private String filterCondStr = "";
- public GroupConsumeCtrlEntity() {
+ public GroupFilterCtrlEntity() {
super();
}
- public GroupConsumeCtrlEntity(String topicName, String groupName,
- String createUser, Date createDate) {
+ public GroupFilterCtrlEntity(String topicName, String groupName,
+ EnableStatus filterConsumeStatus,
+ String filterCondStr, String createUser,
+ Date createDate) {
super(createUser, createDate);
+ this.setTopicAndGroup(topicName, groupName);
+ this.filterConsumeStatus = filterConsumeStatus;
+ this.filterCondStr = filterCondStr;
+ }
+
+ public GroupFilterCtrlEntity(BdbGroupFilterCondEntity bdbEntity) {
+ super(bdbEntity.getDataVerId(),
+ bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
+ this.setTopicAndGroup(bdbEntity.getTopicName(),
+ bdbEntity.getConsumerGroupName());
+ if (bdbEntity.getControlStatus() == 2) {
+ this.filterConsumeStatus = EnableStatus.STATUS_ENABLE;
+ } else if (bdbEntity.getControlStatus() == -2) {
+ this.filterConsumeStatus = EnableStatus.STATUS_UNDEFINE;
+ } else {
+ this.filterConsumeStatus = EnableStatus.STATUS_DISABLE;
+ }
+ this.filterCondStr = bdbEntity.getFilterCondStr();
+ this.setAttributes(bdbEntity.getFilterCondStr());
+ }
+
+ public BdbGroupFilterCondEntity buildBdbGroupFilterCondEntity() {
+ BdbGroupFilterCondEntity bdbEntity =
+ new BdbGroupFilterCondEntity(topicName, groupName,
+ filterConsumeStatus.getCode(), filterCondStr,
+ getAttributes(), getCreateUser(), getCreateDate());
+ bdbEntity.setDataVerId(getDataVersionId());
+ return bdbEntity;
+ }
+
+ public void setTopicAndGroup(String topicName, String groupName) {
+ this.topicName = topicName;
+ this.groupName = groupName;
this.recordKey = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
.append(topicName).append(TokenConstants.ATTR_SEP)
.append(groupName).toString();
- this.topicName = topicName;
- this.groupName = groupName;
}
public String getRecordKey() {
@@ -71,35 +104,16 @@ public class GroupConsumeCtrlEntity extends BaseEntity {
return groupName;
}
- public boolean isEnableResCheck() {
- return resCheckStatus == RuleStatus.STATUS_ENABLE;
- }
-
- public void setResCheckStatus(boolean enableResChk) {
- if (enableResChk) {
- this.resCheckStatus = RuleStatus.STATUS_ENABLE;
- } else {
- this.resCheckStatus = RuleStatus.STATUS_DISABLE;
- }
- }
-
- public int getAllowedBrokerClientRate() {
- return allowedBrokerClientRate;
- }
-
- public void setAllowedBrokerClientRate(int allowedBrokerClientRate) {
- this.allowedBrokerClientRate = allowedBrokerClientRate;
- }
public boolean isEnableFilterConsume() {
- return filterConsumeStatus == RuleStatus.STATUS_ENABLE;
+ return filterConsumeStatus == EnableStatus.STATUS_ENABLE;
}
public void setFilterConsumeStatus(boolean enableFilterConsume) {
if (enableFilterConsume) {
- this.filterConsumeStatus = RuleStatus.STATUS_ENABLE;
+ this.filterConsumeStatus = EnableStatus.STATUS_ENABLE;
} else {
- this.filterConsumeStatus = RuleStatus.STATUS_DISABLE;
+ this.filterConsumeStatus = EnableStatus.STATUS_DISABLE;
}
}
@@ -110,4 +124,5 @@ public class GroupConsumeCtrlEntity extends BaseEntity {
public void setFilterCondStr(String filterCondStr) {
this.filterCondStr = filterCondStr;
}
+
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFlowCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFlowCtrlEntity.java
deleted file mode 100644
index 5ffa81c..0000000
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/GroupFlowCtrlEntity.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.metastore.dao.entity;
-
-import java.util.Date;
-import org.apache.tubemq.corebase.TBaseConstants;
-import org.apache.tubemq.server.common.statusdef.RuleStatus;
-
-/*
- * store the group flow control setting
- *
- */
-public class GroupFlowCtrlEntity extends BaseEntity {
- private String groupName = ""; // group name
- private RuleStatus flowCtrlStatus = RuleStatus.STATUS_UNDEFINE;
- private int qryPriorityId =
- TBaseConstants.META_VALUE_UNDEFINED; // consume priority id
- private int ruleCnt = 0; //flow control rule count
- private String flowCtrlInfo = ""; // flow control info
-
-
- public GroupFlowCtrlEntity() {
- super();
- }
-
- public GroupFlowCtrlEntity(String groupName, boolean enableFlowCtrl,
- int qryPriorityId, int ruleCnt,
- String flowCtrlInfo, String createUser,
- Date createDate) {
- super(createUser, createDate);
- this.groupName = groupName;
- if (enableFlowCtrl) {
- this.flowCtrlStatus = RuleStatus.STATUS_ENABLE;
- } else {
- this.flowCtrlStatus = RuleStatus.STATUS_DISABLE;
- }
- this.qryPriorityId = qryPriorityId;
- this.ruleCnt = ruleCnt;
- this.flowCtrlInfo = flowCtrlInfo;
- }
-
- public String getGroupName() {
- return groupName;
- }
-
- public void setGroupName(String groupName) {
- this.groupName = groupName;
- }
-
- public void setFlowCtrlRule(int ruleCnt, String flowCtrlInfo) {
- this.ruleCnt = ruleCnt;
- this.flowCtrlInfo = flowCtrlInfo;
- }
-
- public int getRuleCnt() {
- return ruleCnt;
- }
-
- public String getFlowCtrlInfo() {
- return flowCtrlInfo;
- }
-
- public void setFlowCtrlStatus(boolean enableFlowCtrl) {
- if (enableFlowCtrl) {
- this.flowCtrlStatus = RuleStatus.STATUS_ENABLE;
- } else {
- this.flowCtrlStatus = RuleStatus.STATUS_DISABLE;
- }
- }
-
- public int getQryPriorityId() {
- return qryPriorityId;
- }
-
- public void setQryPriorityId(int qryPriorityId) {
- this.qryPriorityId = qryPriorityId;
- }
-
- public boolean isFlowCtrlEnable() {
- return (this.flowCtrlStatus == RuleStatus.STATUS_ENABLE);
- }
-
-
-}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicAuthCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicAuthCtrlEntity.java
deleted file mode 100644
index 2c613b4..0000000
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicAuthCtrlEntity.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.metastore.dao.entity;
-
-import java.util.Date;
-import org.apache.tubemq.server.common.statusdef.RuleStatus;
-
-
-/*
- * store the topic authenticate control setting
- *
- */
-public class TopicAuthCtrlEntity extends BaseEntity {
-
- private String topicName = "";
- private RuleStatus authCtrlStatus = RuleStatus.STATUS_UNDEFINE;
-
-
- public TopicAuthCtrlEntity() {
- super();
- }
-
- public TopicAuthCtrlEntity(String topicName, boolean enableAuth,
- String createUser, Date createDate) {
- super(createUser, createDate);
- this.topicName = topicName;
- if (enableAuth) {
- this.authCtrlStatus = RuleStatus.STATUS_ENABLE;
- } else {
- this.authCtrlStatus = RuleStatus.STATUS_DISABLE;
- }
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
- public boolean isAuthCtrlEnable() {
- return authCtrlStatus == RuleStatus.STATUS_ENABLE;
- }
-
- public void setAuthCtrlStatus(boolean enableAuth) {
- if (enableAuth) {
- this.authCtrlStatus = RuleStatus.STATUS_ENABLE;
- } else {
- this.authCtrlStatus = RuleStatus.STATUS_DISABLE;
- }
- }
-}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
index 1556cec..52cfc61 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicConfEntity.java
@@ -21,6 +21,7 @@ import java.util.Date;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TokenConstants;
import org.apache.tubemq.server.common.statusdef.TopicStatus;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
/*
@@ -30,100 +31,113 @@ import
org.apache.tubemq.server.common.statusdef.TopicStatus;
public class TopicConfEntity extends BaseEntity {
private String recordKey = "";
- private String topicName = ""; //topic name
- private int brokerId = TBaseConstants.META_VALUE_UNDEFINED; //broker id
- private TopicStatus topicStatus = TopicStatus.STATUS_TOPIC_OK; // topic
status
- private String brokerIp = ""; //broker ip
- private int brokerPort = TBaseConstants.META_VALUE_UNDEFINED; //broker
port
- private String brokerAddress = ""; //broker address
- private TopicPropGroup topicPropGroup = null;
- private int maxMsgSizeInM = TBaseConstants.META_VALUE_UNDEFINED;
-
+ private String topicName = "";
+ private int brokerId = TBaseConstants.META_VALUE_UNDEFINED;
+ // topic id, require globally unique
+ private int topicId = TBaseConstants.META_VALUE_UNDEFINED;
+ private TopicStatus deployStatus = TopicStatus.STATUS_TOPIC_OK; // topic
status
+ private TopicPropGroup topicProps = null;
public TopicConfEntity() {
super();
}
- public TopicConfEntity(String topicName, int brokerId, TopicStatus
topicStatus,
- String brokerIp, int brokerPort, TopicPropGroup
topicPropGroup,
- int maxMsgSizeInM, String createUser, Date
createDate) {
- super(createUser, createDate);
- setBrokerAndTopicInfo(brokerId, brokerIp, brokerPort, topicName);
- this.topicStatus = topicStatus;
- this.topicPropGroup = topicPropGroup;
- this.maxMsgSizeInM = maxMsgSizeInM;
- }
-
- public void setBrokerAndTopicInfo(int brokerId, String brokerIp,
- int brokerPort, String topicName) {
- StringBuilder sBuilder = new
StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE);
- this.recordKey = sBuilder.append(brokerId)
- .append(TokenConstants.ATTR_SEP).append(topicName).toString();
- this.brokerId = brokerId;
- this.brokerIp = brokerIp;
- this.brokerPort = brokerPort;
+ public TopicConfEntity(String topicName, int brokerId, int topicId,
+ TopicPropGroup topicProps, TopicStatus deployStatus,
+ long dataVersionId, String createUser,
+ Date createDate, String modifyUser, Date
modifyDate) {
+ super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
+ setTopicDeployInfo(brokerId, topicName, topicId);
+ this.deployStatus = deployStatus;
+ this.topicProps = topicProps;
+ }
+
+ public TopicConfEntity(BdbTopicConfEntity bdbEntity) {
+ super(bdbEntity.getDataVerId(),
+ bdbEntity.getCreateUser(), bdbEntity.getCreateDate(),
+ bdbEntity.getModifyUser(), bdbEntity.getModifyDate());
+ setTopicDeployInfo(bdbEntity.getBrokerId(),
+ bdbEntity.getTopicName(), bdbEntity.getTopicId());
+ this.deployStatus = TopicStatus.valueOf(bdbEntity.getTopicStatusId());
+ this.topicProps =
+ new TopicPropGroup(bdbEntity.getNumTopicStores(),
bdbEntity.getNumPartitions(),
+ bdbEntity.getUnflushThreshold(),
bdbEntity.getUnflushInterval(),
+ bdbEntity.getUnflushDataHold(),
bdbEntity.getMemCacheMsgSizeInMB(),
+ bdbEntity.getMemCacheMsgCntInK(),
bdbEntity.getMemCacheFlushIntvl(),
+ bdbEntity.getAcceptPublish(),
bdbEntity.getAcceptSubscribe(),
+ bdbEntity.getDeletePolicy(),
bdbEntity.getDataStoreType(),
+ bdbEntity.getDataPath());
+ this.setAttributes(bdbEntity.getAttributes());
+ }
+
+ public BdbTopicConfEntity buildBdbTopicConfEntity() {
+ BdbTopicConfEntity bdbEntity =
+ new BdbTopicConfEntity(brokerId, topicName,
+ topicProps.getNumTopicStores(),
topicProps.getNumPartitions(),
+ topicProps.getUnflushThreshold(),
topicProps.getUnflushInterval(),
+ topicProps.getDeletePolicy(), getAttributes(),
+ topicProps.isAcceptPublish(),
topicProps.isAcceptSubscribe(),
+ getCreateUser(), getCreateDate(), getModifyUser(),
getModifyDate());
+ bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setTopicId(topicId);
+ bdbEntity.setNumTopicStores(topicProps.getNumTopicStores());
+ bdbEntity.setMemCacheMsgSizeInMB(topicProps.getMemCacheMsgSizeInMB());
+ bdbEntity.setMemCacheMsgCntInK(topicProps.getMemCacheMsgCntInK());
+ bdbEntity.setMemCacheFlushIntvl(topicProps.getMemCacheFlushIntvl());
+ bdbEntity.setUnflushDataHold(topicProps.getUnflushDataHold());
+ return bdbEntity;
+ }
+
+ public void setTopicDeployInfo(int brokerId, String topicName, int
topicId) {
this.topicName = topicName;
- if (this.brokerPort != TBaseConstants.META_VALUE_UNDEFINED) {
- sBuilder.delete(0, sBuilder.length());
- this.brokerAddress = sBuilder.append(brokerIp)
-
.append(TokenConstants.ATTR_SEP).append(brokerPort).toString();
- }
+ this.topicId = topicId;
+ this.recordKey = new StringBuilder(TBaseConstants.BUILDER_DEFAULT_SIZE)
+ .append(brokerId).append(TokenConstants.ATTR_SEP)
+ .append(topicName).toString();
}
public String getRecordKey() {
return recordKey;
}
- public String getTopicName() {
- return topicName;
- }
-
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
public int getBrokerId() {
return brokerId;
}
- public String getBrokerIp() {
- return brokerIp;
+ public TopicPropGroup getTopicProps() {
+ return topicProps;
}
- public int getBrokerPort() {
- return brokerPort;
+ public void setTopicProps(TopicPropGroup topicProps) {
+ this.topicProps = topicProps;
}
- public String getBrokerAddress() {
- return brokerAddress;
+ public int getTopicId() {
+ return topicId;
}
- public int getMaxMsgSizeInM() {
- return maxMsgSizeInM;
+ public void setTopicId(int topicId) {
+ this.topicId = topicId;
}
- public void setMaxMsgSizeInM(int maxMsgSizeInM) {
- this.maxMsgSizeInM = maxMsgSizeInM;
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public TopicStatus getTopicStatus() {
+ return deployStatus;
}
public void setTopicStatusId(int topicStatusId) {
- this.topicStatus = TopicStatus.valueOf(topicStatusId);
+ this.deployStatus = TopicStatus.valueOf(topicStatusId);
}
public int getTopicStatusId() {
- return topicStatus.getCode();
+ return deployStatus.getCode();
}
public boolean isValidTopicStatus() {
- return this.topicStatus == TopicStatus.STATUS_TOPIC_OK;
- }
-
- public TopicPropGroup getTopicPropGroup() {
- return topicPropGroup;
- }
-
- public void setTopicPropGroup(TopicPropGroup topicPropGroup) {
- this.topicPropGroup = topicPropGroup;
+ return this.deployStatus == TopicStatus.STATUS_TOPIC_OK;
}
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
new file mode 100644
index 0000000..406da37
--- /dev/null
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicCtrlEntity.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.server.master.metastore.dao.entity;
+
+import java.util.Date;
+import org.apache.tubemq.corebase.TBaseConstants;
+import org.apache.tubemq.server.common.statusdef.EnableStatus;
+import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
+
+
+/*
+ * store the topic authenticate control setting
+ *
+ */
+public class TopicCtrlEntity extends BaseEntity {
+
+ private String topicName = "";
+ private EnableStatus authCtrlStatus = EnableStatus.STATUS_UNDEFINE;
+ private int maxMsgSizeInB = TBaseConstants.META_VALUE_UNDEFINED;
+
+
+ public TopicCtrlEntity() {
+ super();
+ }
+
+ public TopicCtrlEntity(String topicName, boolean enableAuth, int
maxMsgSizeInB,
+ long dataVersionId, String createUser,
+ Date createDate, String modifyUser, Date
modifyDate) {
+ super(dataVersionId, createUser, createDate, modifyUser, modifyDate);
+ this.topicName = topicName;
+ this.maxMsgSizeInB = maxMsgSizeInB;
+ if (enableAuth) {
+ this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ }
+
+ public TopicCtrlEntity(BdbTopicAuthControlEntity bdbEntity) {
+ super(bdbEntity.getDataVerId(),
+ bdbEntity.getCreateUser(), bdbEntity.getCreateDate());
+ this.topicName = bdbEntity.getTopicName();
+ this.maxMsgSizeInB = bdbEntity.getMaxMsgSize();
+ if (bdbEntity.isEnableAuthControl()) {
+ this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ this.setAttributes(bdbEntity.getAttributes());
+ }
+
+ public BdbTopicAuthControlEntity buildBdbTopicAuthControlEntity() {
+ BdbTopicAuthControlEntity bdbEntity =
+ new BdbTopicAuthControlEntity(topicName, isAuthCtrlEnable(),
+ getAttributes(), getCreateUser(), getCreateDate());
+ bdbEntity.setDataVerId(getDataVersionId());
+ bdbEntity.setMaxMsgSize(maxMsgSizeInB);
+ return bdbEntity;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public boolean isAuthCtrlEnable() {
+ return authCtrlStatus == EnableStatus.STATUS_ENABLE;
+ }
+
+ public void setAuthCtrlStatus(boolean enableAuth) {
+ if (enableAuth) {
+ this.authCtrlStatus = EnableStatus.STATUS_ENABLE;
+ } else {
+ this.authCtrlStatus = EnableStatus.STATUS_DISABLE;
+ }
+ }
+}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
index 2c9931a..2567f96 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/entity/TopicPropGroup.java
@@ -17,14 +17,16 @@
package org.apache.tubemq.server.master.metastore.dao.entity;
+import java.io.Serializable;
import org.apache.tubemq.corebase.TBaseConstants;
+
/*
* Topic property group, save topic related storage and configuration
information.
*
*/
-public class TopicPropGroup {
+public class TopicPropGroup implements Serializable {
private int numTopicStores = TBaseConstants.META_VALUE_UNDEFINED;
//store num
private int numPartitions = TBaseConstants.META_VALUE_UNDEFINED;
//partition num
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFlowCtrlMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
similarity index 75%
rename from
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFlowCtrlMapper.java
rename to
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
index 960dda7..7deadef 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFlowCtrlMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/AbstractMapper.java
@@ -17,15 +17,15 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
-import java.util.Map;
+import org.apache.tubemq.server.common.exception.LoadMetaException;
+import org.apache.tubemq.server.common.utils.ProcessResult;
-public interface GroupFlowCtrlMapper {
- boolean loadGroupFlowCtrlConfig(Map<String, String> metaDataMap);
+public interface AbstractMapper {
- boolean putGroupFlowCtrlConfig(String key, String flowCtrlJsonData);
+ void close();
- boolean delGroupFlowCtrlConfig(String key);
+ void loadConfig(ProcessResult result) throws LoadMetaException;
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
index 0178623..ec5bd46 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/BrokerConfigMapper.java
@@ -17,15 +17,15 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
-import java.util.Map;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metastore.dao.entity.BrokerConfEntity;
-public interface BrokerConfigMapper {
- boolean loadBrokerConfig(Map<String, String> metaDataMap);
+public interface BrokerConfigMapper extends AbstractMapper {
- boolean putBrokerConfig(String key, String brokerJsonData);
+ boolean putBrokerConfig(BrokerConfEntity memEntity, ProcessResult result);
- boolean delBrokerConfig(String key);
+ boolean delBrokerConfig(int brokerId);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
index ff4f093..3fcd45d 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
@@ -21,11 +21,7 @@ import org.apache.tubemq.server.common.utils.ProcessResult;
import
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
-public interface ClusterConfigMapper {
-
- void close();
-
- boolean loadClusterConfig(ProcessResult result);
+public interface ClusterConfigMapper extends AbstractMapper {
boolean putClusterConfig(ClusterSettingEntity entity, ProcessResult
result);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
index a08ab41..fed52e6 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupBlackListMapper.java
@@ -17,13 +17,14 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
-import java.util.Map;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBlackGroupEntity;
-public interface GroupBlackListMapper {
- boolean loadGroupBlackListConfig(Map<String, String> metaDataMap);
- boolean putGroupBlackListConfig(String key, String blackListJsonData);
+public interface GroupBlackListMapper extends AbstractMapper {
+
+ boolean putGroupBlackListConfig(BdbBlackGroupEntity entity, ProcessResult
result);
boolean delGroupBlackListConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
similarity index 72%
copy from
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
copy to
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
index ff4f093..04904ba 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConfigMapper.java
@@ -18,16 +18,15 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
import org.apache.tubemq.server.common.utils.ProcessResult;
-import
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
+import org.apache.tubemq.server.master.metastore.dao.entity.GroupConfigEntity;
-public interface ClusterConfigMapper {
- void close();
+public interface GroupConfigMapper extends AbstractMapper {
- boolean loadClusterConfig(ProcessResult result);
+ boolean putGroupConfigConfig(GroupConfigEntity entity, ProcessResult
result);
+
+ boolean delGroupConfigConfig(String key);
- boolean putClusterConfig(ClusterSettingEntity entity, ProcessResult
result);
- boolean delClusterConfig(String key);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
similarity index 72%
copy from
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
copy to
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
index ff4f093..5a83a74 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/ClusterConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupFilterCtrlMapper.java
@@ -18,16 +18,14 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
import org.apache.tubemq.server.common.utils.ProcessResult;
-import
org.apache.tubemq.server.master.metastore.dao.entity.ClusterSettingEntity;
+import
org.apache.tubemq.server.master.metastore.dao.entity.GroupFilterCtrlEntity;
-public interface ClusterConfigMapper {
+public interface GroupFilterCtrlMapper extends AbstractMapper {
- void close();
+ boolean putGroupFilterCtrlConfig(GroupFilterCtrlEntity entity,
ProcessResult result);
- boolean loadClusterConfig(ProcessResult result);
+ boolean delGroupFilterCtrlConfig(String key);
- boolean putClusterConfig(ClusterSettingEntity entity, ProcessResult
result);
- boolean delClusterConfig(String key);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicAuthCtrlMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicAuthCtrlMapper.java
deleted file mode 100644
index 30aaadf..0000000
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicAuthCtrlMapper.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.master.metastore.dao.mapper;
-
-import java.util.Map;
-
-public interface TopicAuthCtrlMapper {
-
- boolean loadTopicAuthCtrlConfig(Map<String, String> metaDataMap);
-
- boolean putTopicAuthCtrlConfig(String key, String topicAuthCtrlJsonData);
-
- boolean delTopicAuthCtrlConfig(String key);
-
-
-}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
index 658feda..a7d3164 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicConfigMapper.java
@@ -17,13 +17,13 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
-import java.util.Map;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
-public interface TopicConfigMapper {
- boolean loadTopicConfig(Map<String, String> metaDataMap);
+public interface TopicConfigMapper extends AbstractMapper {
- boolean putTopicConfig(String key, String topicJsonData);
+ boolean putTopicConfig(BdbTopicConfEntity entity, ProcessResult result);
boolean delTopicConfig(String key);
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConsumeCtrlMapper.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
similarity index 71%
rename from
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConsumeCtrlMapper.java
rename to
tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
index b45620e..a3c4d99 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/GroupConsumeCtrlMapper.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metastore/dao/mapper/TopicCtrlMapper.java
@@ -17,15 +17,16 @@
package org.apache.tubemq.server.master.metastore.dao.mapper;
-import java.util.Map;
+import org.apache.tubemq.server.common.utils.ProcessResult;
+import org.apache.tubemq.server.master.metastore.dao.entity.TopicCtrlEntity;
-public interface GroupConsumeCtrlMapper {
- boolean loadGroupConsumeCtrlConfig(Map<String, String> metaDataMap);
- boolean putGroupConsumeCtrlConfig(String key, String consumeCtrlJsonData);
+public interface TopicCtrlMapper extends AbstractMapper {
- boolean delGroupConsumeCtrlConfig(String key);
+ boolean putTopicCtrlConfig(TopicCtrlEntity entity, ProcessResult result);
+
+ boolean delTopicCtrlConfig(String key);
}
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
index 125a490..4deefda 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerConfManager.java
@@ -53,6 +53,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEnt
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFlowCtrlEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
import org.apache.tubemq.server.master.web.model.ClusterGroupVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -379,7 +380,7 @@ public class BrokerConfManager implements Server {
if (bdbGroupFilterCondEntity == null) {
continue;
}
- String allowedConds = bdbGroupFilterCondEntity.getAttributes();
+ String allowedConds = bdbGroupFilterCondEntity.getFilterCondStr();
TreeSet<String> condItemSet = reqTopicConditions.get(tmpTopic);
if (allowedConds.length() == 2
&&
allowedConds.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
@@ -1442,8 +1443,8 @@ public class BrokerConfManager implements Server {
&&
!bdbQueryEntity.getTopicName().equals(entity.getTopicName()))
||
(!TStringUtils.isBlank(bdbQueryEntity.getConsumerGroupName())
&&
!bdbQueryEntity.getConsumerGroupName().equals(entity.getConsumerGroupName()))
- ||
(!TStringUtils.isBlank(bdbQueryEntity.getAttributes())
- &&
!bdbQueryEntity.getAttributes().equals(entity.getAttributes()))
+ ||
(!TStringUtils.isBlank(bdbQueryEntity.getFilterCondStr())
+ &&
!bdbQueryEntity.getFilterCondStr().equals(entity.getFilterCondStr()))
|| (bdbQueryEntity.getControlStatus() !=
TStatusConstants.STATUS_SERVICE_UNDEFINED
&& bdbQueryEntity.getControlStatus() !=
entity.getControlStatus())
||
(!TStringUtils.isBlank(bdbQueryEntity.getCreateUser())
@@ -2033,10 +2034,10 @@ public class BrokerConfManager implements Server {
public boolean confSetBdbClusterDefSetting(BdbClusterSettingEntity
bdbEntity)
throws Exception {
validMasterStatus();
- bdbEntity.setRecordKey(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ bdbEntity.setRecordKey(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
boolean result =
mBdbStoreManagerService.putBdbClusterConfEntity(bdbEntity,
true);
- clusterSettingMap.put(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING,
bdbEntity);
+ clusterSettingMap.put(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING,
bdbEntity);
StringBuilder strBuffer = new StringBuilder(512);
if (result) {
strBuffer.append("[confSetBdbClusterDefSetting Success], add new
record :");
@@ -2057,7 +2058,7 @@ public class BrokerConfManager implements Server {
public boolean confDeleteBdbClusterSetting(final StringBuilder strBuffer)
throws Exception {
validMasterStatus();
BdbClusterSettingEntity curEntity =
-
this.clusterSettingMap.remove(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+
this.clusterSettingMap.remove(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
if (curEntity != null) {
mBdbStoreManagerService.delBdbClusterConfEntity();
strBuffer.append(
@@ -2071,7 +2072,7 @@ public class BrokerConfManager implements Server {
}
public BdbClusterSettingEntity getBdbClusterSetting() {
- return
this.clusterSettingMap.get(TServerConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
+ return
this.clusterSettingMap.get(TStoreConstants.TOKEN_DEFAULT_CLUSTER_SETTING);
}
private void validMasterStatus() throws Exception {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 9039d66..58e47bd 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -345,7 +345,7 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
new
BdbGroupFilterCondEntity(curFilterCondEntity.getTopicName(),
curFilterCondEntity.getConsumerGroupName(),
curFilterCondEntity.getControlStatus(),
- curFilterCondEntity.getAttributes(),
+ curFilterCondEntity.getFilterCondStr(),
modifyUser, modifyDate);
int filterCondStatus =
WebParameterUtils.validIntDataParameter("condStatus",
@@ -361,9 +361,9 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
String strNewFilterConds =
WebParameterUtils.checkAndGetFilterConds(req.getParameter("filterConds"),
false, sBuilder);
if (TStringUtils.isNotBlank(strNewFilterConds)) {
- if
(!curFilterCondEntity.getAttributes().equals(strNewFilterConds)) {
+ if
(!curFilterCondEntity.getFilterCondStr().equals(strNewFilterConds)) {
foundChange = true;
- newFilterCondEntity.setAttributes(strNewFilterConds);
+ newFilterCondEntity.setFilterCondStr(strNewFilterConds);
}
}
if (foundChange) {
@@ -446,7 +446,7 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
new
BdbGroupFilterCondEntity(curFilterCondEntity.getTopicName(),
curFilterCondEntity.getConsumerGroupName(),
curFilterCondEntity.getControlStatus(),
- curFilterCondEntity.getAttributes(),
+ curFilterCondEntity.getFilterCondStr(),
modifyUser, modifyDate);
int filterCondStatus =
WebParameterUtils.validIntDataParameter("condStatus",
@@ -463,9 +463,9 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
(String) groupObject.get("filterConds"),
false, sBuilder);
if (TStringUtils.isNotBlank(strNewFilterConds)) {
- if
(!curFilterCondEntity.getAttributes().equals(strNewFilterConds)) {
+ if
(!curFilterCondEntity.getFilterCondStr().equals(strNewFilterConds)) {
foundChange = true;
-
newFilterCondEntity.setAttributes(strNewFilterConds);
+
newFilterCondEntity.setFilterCondStr(strNewFilterConds);
}
}
if (!foundChange) {
@@ -691,7 +691,7 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\"data\":[");
for (BdbGroupFilterCondEntity entity : webGroupCondEntities) {
if (!filterCondSet.isEmpty()) {
- String filterItems = entity.getAttributes();
+ String filterItems = entity.getFilterCondStr();
if (filterItems.length() == 2
&&
filterItems.equals(TServerConstants.BLANK_FILTER_ITEM_STR)) {
continue;
@@ -714,11 +714,11 @@ public class WebAdminGroupCtrlHandler extends
AbstractWebHandler {
sBuilder.append("{\"topicName\":\"").append(entity.getTopicName())
.append("\",\"groupName\":\"").append(entity.getConsumerGroupName())
.append("\",\"condStatus\":").append(entity.getControlStatus());
- if (entity.getAttributes().length() <= 2) {
+ if (entity.getFilterCondStr().length() <= 2) {
sBuilder.append(",\"filterConds\":\"\"");
} else {
sBuilder.append(",\"filterConds\":\"")
- .append(entity.getAttributes())
+ .append(entity.getFilterCondStr())
.append("\"");
}
sBuilder.append(",\"createUser\":\"").append(entity.getCreateUser())
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
index 0f4429b..0c764d3 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminTopicAuthHandler.java
@@ -319,11 +319,11 @@ public class WebAdminTopicAuthHandler extends
AbstractWebHandler {
sBuilder.append("{\"topicName\":\"").append(condEntity.getTopicName())
.append("\",\"groupName\":\"").append(condEntity.getConsumerGroupName())
.append("\",\"condStatus\":").append(condEntity.getControlStatus());
- if (condEntity.getAttributes().length() <= 2) {
+ if (condEntity.getFilterCondStr().length() <= 2) {
sBuilder.append(",\"filterConds\":\"\"");
} else {
sBuilder.append(",\"filterConds\":\"")
- .append(condEntity.getAttributes())
+ .append(condEntity.getFilterCondStr())
.append("\"");
}
sBuilder.append(",\"createUser\":\"").append(condEntity.getCreateUser())
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index befb362..d096f37 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -38,6 +38,7 @@ import
org.apache.tubemq.server.common.utils.WebParameterUtils;
import org.apache.tubemq.server.master.TMaster;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerConfManager;
import org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerInfoHolder;
import
org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
@@ -297,16 +298,16 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
req.getParameter("brokerTLSPort"), false,
TBaseConstants.META_DEFAULT_BROKER_TLS_PORT, 0);
String attributes =
-
strBuffer.append(TokenConstants.TOKEN_STORE_NUM).append(TokenConstants.EQ).append(numTopicStores)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD)
+
strBuffer.append(TStoreConstants.TOKEN_STORE_NUM).append(TokenConstants.EQ).append(numTopicStores)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
.append(TokenConstants.EQ).append(unFlushDataHold)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_CNT)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_CNT)
.append(TokenConstants.EQ).append(memCacheMsgCntInK)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE)
.append(TokenConstants.EQ).append(memCacheMsgSizeInMB).append(TokenConstants.SEGMENT_SEP)
- .append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
+ .append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL)
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_TLS_PORT)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_TLS_PORT)
.append(TokenConstants.EQ).append(brokerTlsPort).toString();
strBuffer.delete(0, strBuffer.length());
BdbBrokerConfEntity brokerConfEntity =
@@ -468,16 +469,16 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
WebParameterUtils.validIntDataParameter("memCacheFlushIntvl",
jsonObject.get("memCacheFlushIntvl"),
false, 20000, 4000);
String attributes = strBuffer
-
.append(TokenConstants.TOKEN_STORE_NUM).append(TokenConstants.EQ).append(numTopicStores)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD)
+
.append(TStoreConstants.TOKEN_STORE_NUM).append(TokenConstants.EQ).append(numTopicStores)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
.append(TokenConstants.EQ).append(unFlushDataHold).append(TokenConstants.SEGMENT_SEP)
-
.append(TokenConstants.TOKEN_MCACHE_MSG_CNT).append(TokenConstants.EQ)
+
.append(TStoreConstants.TOKEN_MCACHE_MSG_CNT).append(TokenConstants.EQ)
.append(memCacheMsgCntInK).append(TokenConstants.SEGMENT_SEP)
-
.append(TokenConstants.TOKEN_MCACHE_MSG_SIZE).append(TokenConstants.EQ)
+
.append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE).append(TokenConstants.EQ)
.append(memCacheMsgSizeInMB).append(TokenConstants.SEGMENT_SEP)
-
.append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL).append(TokenConstants.EQ)
+
.append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL).append(TokenConstants.EQ)
.append(memCacheFlushIntvl).append(TokenConstants.SEGMENT_SEP)
-
.append(TokenConstants.TOKEN_TLS_PORT).append(TokenConstants.EQ)
+
.append(TStoreConstants.TOKEN_TLS_PORT).append(TokenConstants.EQ)
.append(brokerTlsPort).toString();
strBuffer.delete(0, strBuffer.length());
inBrokerConfEntityMap.put(inputKey, new
BdbBrokerConfEntity(brokerId, brokerIp,
@@ -875,7 +876,7 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
req.getParameter("numTopicStores"), false,
TBaseConstants.META_VALUE_UNDEFINED, 1);
if ((numTopicStores > 0) && (numTopicStores !=
oldEntity.getNumTopicStores())) {
foundChange = true;
- newEntity.appendAttributes(TokenConstants.TOKEN_STORE_NUM,
String.valueOf(numTopicStores));
+
newEntity.appendAttributes(TStoreConstants.TOKEN_STORE_NUM,
String.valueOf(numTopicStores));
}
int unFlushDataHold =
WebParameterUtils.validIntDataParameter("unflushDataHold",
req.getParameter("unflushDataHold"), false,
TBaseConstants.META_VALUE_UNDEFINED, 0);
@@ -911,7 +912,7 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
if ((memCacheFlushIntvl > 0) && (memCacheFlushIntvl !=
oldEntity.getDftMemCacheFlushIntvl())) {
foundChange = true;
newEntity.setDftMemCacheFlushIntvl(memCacheFlushIntvl);
-
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,
+
newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL,
String.valueOf(memCacheFlushIntvl));
}
String publishParaStr = req.getParameter("acceptPublish");
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
index 74a4ed0..09670a0 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerTopicConfHandler.java
@@ -45,6 +45,7 @@ import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbConsumerGroupEntit
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
import
org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicAuthControlEntity;
import org.apache.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
+import org.apache.tubemq.server.master.metastore.TStoreConstants;
import
org.apache.tubemq.server.master.nodemanage.nodebroker.BrokerSyncStatusInfo;
import
org.apache.tubemq.server.master.nodemanage.nodebroker.TopicPSInfoManager;
import org.slf4j.Logger;
@@ -220,17 +221,17 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
req.getParameter("maxMsgSizeInMB"),
false,
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB,
TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB);
- String attributes =
strBuffer.append(TokenConstants.TOKEN_STORE_NUM)
+ String attributes =
strBuffer.append(TStoreConstants.TOKEN_STORE_NUM)
.append(TokenConstants.EQ).append(numTopicStores)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
.append(TokenConstants.EQ).append(unFlushDataHold)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_CNT)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_CNT)
.append(TokenConstants.EQ).append(memCacheMsgCntInK)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE)
.append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL)
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
-
.append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MAX_MSG_SIZE)
.append(TokenConstants.EQ)
.append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
.toString();
@@ -389,17 +390,17 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
itemCreateDate = createDate;
}
String attributes =
- strBuffer.append(TokenConstants.TOKEN_STORE_NUM)
+ strBuffer.append(TStoreConstants.TOKEN_STORE_NUM)
.append(TokenConstants.EQ).append(numTopicStores)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_DATA_UNFLUSHHOLD)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_DATA_UNFLUSHHOLD)
.append(TokenConstants.EQ).append(unFlushDataHold)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_CNT)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_CNT)
.append(TokenConstants.EQ).append(memCacheMsgCntInK)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_MSG_SIZE)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_MSG_SIZE)
.append(TokenConstants.EQ).append(memCacheMsgSizeInMB)
-
.append(TokenConstants.SEGMENT_SEP).append(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL)
.append(TokenConstants.EQ).append(memCacheFlushIntvl)
-
.append(TokenConstants.SEGMENT_SEP).append(TServerConstants.TOKEN_MAX_MSG_SIZE)
+
.append(TokenConstants.SEGMENT_SEP).append(TStoreConstants.TOKEN_MAX_MSG_SIZE)
.append(TokenConstants.EQ)
.append(SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB))
.toString();
@@ -700,10 +701,10 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
}
strBuffer.append("{\"groupName\":\"").append(itemCond.getConsumerGroupName())
.append("\",\"condStatus\":").append(itemCond.getControlStatus());
- if (itemCond.getAttributes().length() <= 2) {
+ if (itemCond.getFilterCondStr().length() <= 2) {
strBuffer.append(",\"filterConds\":\"\"");
} else {
-
strBuffer.append(",\"filterConds\":\"").append(itemCond.getAttributes()).append("\"");
+
strBuffer.append(",\"filterConds\":\"").append(itemCond.getFilterCondStr()).append("\"");
}
strBuffer.append(",\"createUser\":\"").append(itemCond.getCreateUser())
.append("\",\"createDate\":\"").append(formatter.format(itemCond.getCreateDate()))
@@ -1423,12 +1424,12 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
}
if (memCacheMsgCntInK >= 0 && memCacheMsgCntInK !=
oldEntity.getMemCacheMsgCntInK()) {
foundChange = true;
-
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_MSG_CNT,
+
newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_MSG_CNT,
String.valueOf(memCacheMsgCntInK));
}
if (memCacheMsgSizeInMB >= 0 && memCacheMsgSizeInMB !=
oldEntity.getMemCacheMsgSizeInMB()) {
foundChange = true;
-
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_MSG_SIZE,
+
newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_MSG_SIZE,
String.valueOf(memCacheMsgSizeInMB));
}
if (maxMsgSizeInMB > 0) {
@@ -1436,13 +1437,13 @@ public class WebBrokerTopicConfHandler extends
AbstractWebHandler {
SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB);
if (maxMsgSizeInB != oldEntity.getMaxMsgSize()) {
foundChange = true;
-
newEntity.appendAttributes(TServerConstants.TOKEN_MAX_MSG_SIZE,
+
newEntity.appendAttributes(TStoreConstants.TOKEN_MAX_MSG_SIZE,
String.valueOf(maxMsgSizeInB));
}
}
if (memCacheFlushIntvl >= 0 && memCacheFlushIntvl !=
oldEntity.getMemCacheFlushIntvl()) {
foundChange = true;
-
newEntity.appendAttributes(TokenConstants.TOKEN_MCACHE_FLUSH_INTVL,
+
newEntity.appendAttributes(TStoreConstants.TOKEN_MCACHE_FLUSH_INTVL,
String.valueOf(memCacheFlushIntvl));
}
if ((numTopicStores > 0) && (numTopicStores !=
oldEntity.getNumTopicStores())) {