http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java deleted file mode 100644 index 72e1b96..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $ - */ -package com.alibaba.rocketmq.common.protocol.route; - -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class TopicRouteData extends RemotingSerializable { - private String orderTopicConf; - private List<QueueData> queueDatas; - private List<BrokerData> brokerDatas; - private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; - - - public TopicRouteData cloneTopicRouteData() { - TopicRouteData topicRouteData = new TopicRouteData(); - topicRouteData.setQueueDatas(new ArrayList<QueueData>()); - topicRouteData.setBrokerDatas(new ArrayList<BrokerData>()); - topicRouteData.setFilterServerTable(new HashMap<String, List<String>>()); - topicRouteData.setOrderTopicConf(this.orderTopicConf); - - if (this.queueDatas != null) { - topicRouteData.getQueueDatas().addAll(this.queueDatas); - } - - if (this.brokerDatas != null) { - topicRouteData.getBrokerDatas().addAll(this.brokerDatas); - } - - if (this.filterServerTable != null) { - topicRouteData.getFilterServerTable().putAll(this.filterServerTable); - } - - return topicRouteData; - } - - - public List<QueueData> getQueueDatas() { - return queueDatas; - } - - - public void setQueueDatas(List<QueueData> queueDatas) { - this.queueDatas = queueDatas; - } - - - public List<BrokerData> getBrokerDatas() { - return brokerDatas; - } - - - public void setBrokerDatas(List<BrokerData> brokerDatas) { - this.brokerDatas = brokerDatas; - } - - public HashMap<String, List<String>> getFilterServerTable() { - return filterServerTable; - } - - public void setFilterServerTable(HashMap<String, List<String>> filterServerTable) { - this.filterServerTable = filterServerTable; - } - - public String getOrderTopicConf() { - return orderTopicConf; - } - - public void setOrderTopicConf(String orderTopicConf) { - this.orderTopicConf = orderTopicConf; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((brokerDatas == null) ? 0 : brokerDatas.hashCode()); - result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode()); - result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode()); - result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TopicRouteData other = (TopicRouteData) obj; - if (brokerDatas == null) { - if (other.brokerDatas != null) - return false; - } else if (!brokerDatas.equals(other.brokerDatas)) - return false; - if (orderTopicConf == null) { - if (other.orderTopicConf != null) - return false; - } else if (!orderTopicConf.equals(other.orderTopicConf)) - return false; - if (queueDatas == null) { - if (other.queueDatas != null) - return false; - } else if (!queueDatas.equals(other.queueDatas)) - return false; - if (filterServerTable == null) { - if (other.filterServerTable != null) - return false; - } else if (!filterServerTable.equals(other.filterServerTable)) - return false; - return true; - } - - @Override - public String toString() { - return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas - + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]"; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java deleted file mode 100644 index 86bdd3d..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.protocol.topic; - -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; - - -public class OffsetMovedEvent extends RemotingSerializable { - private String consumerGroup; - private MessageQueue messageQueue; - private long offsetRequest; - private long offsetNew; - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - - public void setMessageQueue(MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - - public long getOffsetRequest() { - return offsetRequest; - } - - - public void setOffsetRequest(long offsetRequest) { - this.offsetRequest = offsetRequest; - } - - - public long getOffsetNew() { - return offsetNew; - } - - - public void setOffsetNew(long offsetNew) { - this.offsetNew = offsetNew; - } - - - @Override - public String toString() { - return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue - + ", offsetRequest=" + offsetRequest + ", offsetNew=" + offsetNew + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java b/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java deleted file mode 100644 index 8fc4e76..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.queue; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Comparator; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * thread safe - * - * @author lansheng.zj - */ -public class ConcurrentTreeMap<K, V> { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ReentrantLock lock; - private TreeMap<K, V> tree; - private RoundQueue<K> roundQueue; - - - public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) { - tree = new TreeMap<K, V>(comparator); - roundQueue = new RoundQueue<K>(capacity); - lock = new ReentrantLock(true); - } - - - public Map.Entry<K, V> pollFirstEntry() { - lock.lock(); - try { - return tree.pollFirstEntry(); - } finally { - lock.unlock(); - } - } - - - public V putIfAbsentAndRetExsit(K key, V value) { - lock.lock(); - try { - if (roundQueue.put(key)) { - V exsit = tree.get(key); - if (null == exsit) { - tree.put(key, value); - exsit = value; - } - log.warn("putIfAbsentAndRetExsit success. {}", key); - return exsit; - } - - else { - V exsit = tree.get(key); - return exsit; - } - } finally { - lock.unlock(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java b/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java deleted file mode 100644 index a3783ba..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.queue; - -import java.util.LinkedList; -import java.util.Queue; - - -/** - * not thread safe - * - * @author lansheng.zj - */ -public class RoundQueue<E> { - - private Queue<E> queue; - private int capacity; - - - public RoundQueue(int capacity) { - this.capacity = capacity; - queue = new LinkedList<E>(); - } - - - public boolean put(E e) { - boolean ok = false; - if (!queue.contains(e)) { - if (queue.size() >= capacity) { - queue.poll(); - } - queue.add(e); - ok = true; - } - - return ok; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java b/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java deleted file mode 100644 index aa0bc54..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.running; - -public enum RunningStats { - commitLogMaxOffset, - commitLogMinOffset, - commitLogDiskRatio, - consumeQueueDiskRatio, - scheduleMessageOffset, -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java deleted file mode 100644 index 89eefa5..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.stats; - -import com.alibaba.rocketmq.common.UtilAll; -import org.slf4j.Logger; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - - -public class MomentStatsItem { - - private final AtomicLong value = new AtomicLong(0); - - private final String statsName; - private final String statsKey; - private final ScheduledExecutorService scheduledExecutorService; - private final Logger log; - - - public MomentStatsItem(String statsName, String statsKey, - ScheduledExecutorService scheduledExecutorService, Logger log) { - this.statsName = statsName; - this.statsKey = statsKey; - this.scheduledExecutorService = scheduledExecutorService; - this.log = log; - } - - - public void init() { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtMinutes(); - - MomentStatsItem.this.value.set(0); - } catch (Throwable e) { - } - } - }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); - } - - - public void printAtMinutes() { - log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d", - this.statsName, - this.statsKey, - this.value.get())); - } - - public AtomicLong getValue() { - return value; - } - - - public String getStatsKey() { - return statsKey; - } - - - public String getStatsName() { - return statsName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java deleted file mode 100644 index fde88cd..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.stats; - -import com.alibaba.rocketmq.common.UtilAll; -import org.slf4j.Logger; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -public class MomentStatsItemSet { - private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable = - new ConcurrentHashMap<String, MomentStatsItem>(128); - private final String statsName; - private final ScheduledExecutorService scheduledExecutorService; - private final Logger log; - - - public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { - this.statsName = statsName; - this.scheduledExecutorService = scheduledExecutorService; - this.log = log; - this.init(); - } - - public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() { - return statsItemTable; - } - - public String getStatsName() { - return statsName; - } - - public void init() { - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtMinutes(); - } catch (Throwable e) { - } - } - }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); - } - - private void printAtMinutes() { - Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, MomentStatsItem> next = it.next(); - next.getValue().printAtMinutes(); - } - } - - public void setValue(final String statsKey, final int value) { - MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey); - statsItem.getValue().set(value); - } - - public MomentStatsItem getAndCreateStatsItem(final String statsKey) { - MomentStatsItem statsItem = this.statsItemTable.get(statsKey); - if (null == statsItem) { - statsItem = - new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); - MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem); - - if (null == prev) { - - // statsItem.init(); - } - } - - return statsItem; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java deleted file mode 100644 index 1c99699..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.stats; - -import com.alibaba.rocketmq.common.UtilAll; -import org.slf4j.Logger; - -import java.util.LinkedList; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - - -public class StatsItem { - - private final AtomicLong value = new AtomicLong(0); - - private final AtomicLong times = new AtomicLong(0); - - private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>(); - - - private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>(); - - - private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>(); - - private final String statsName; - private final String statsKey; - private final ScheduledExecutorService scheduledExecutorService; - private final Logger log; - - - public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, - Logger log) { - this.statsName = statsName; - this.statsKey = statsKey; - this.scheduledExecutorService = scheduledExecutorService; - this.log = log; - } - - public StatsSnapshot getStatsDataInMinute() { - return computeStatsData(this.csListMinute); - } - - private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) { - StatsSnapshot statsSnapshot = new StatsSnapshot(); - synchronized (csList) { - double tps = 0; - double avgpt = 0; - long sum = 0; - if (!csList.isEmpty()) { - CallSnapshot first = csList.getFirst(); - CallSnapshot last = csList.getLast(); - sum = last.getValue() - first.getValue(); - tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); - - long timesDiff = last.getTimes() - first.getTimes(); - if (timesDiff > 0) { - avgpt = (sum * 1.0d) / timesDiff; - } - } - - statsSnapshot.setSum(sum); - statsSnapshot.setTps(tps); - statsSnapshot.setAvgpt(avgpt); - } - - return statsSnapshot; - } - - public StatsSnapshot getStatsDataInHour() { - return computeStatsData(this.csListHour); - } - - public StatsSnapshot getStatsDataInDay() { - return computeStatsData(this.csListDay); - } - - public void init() { - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - samplingInSeconds(); - } catch (Throwable e) { - } - } - }, 0, 10, TimeUnit.SECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - samplingInMinutes(); - } catch (Throwable e) { - } - } - }, 0, 10, TimeUnit.MINUTES); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - samplingInHour(); - } catch (Throwable e) { - } - } - }, 0, 1, TimeUnit.HOURS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtMinutes(); - } catch (Throwable ignored) { - } - } - }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtHour(); - } catch (Throwable ignored) { - } - } - }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtDay(); - } catch (Throwable ignored) { - } - } - }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); - } - - public void samplingInSeconds() { - synchronized (this.csListMinute) { - this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); - if (this.csListMinute.size() > 7) { - this.csListMinute.removeFirst(); - } - } - } - - public void samplingInMinutes() { - synchronized (this.csListHour) { - this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); - if (this.csListHour.size() > 7) { - this.csListHour.removeFirst(); - } - } - } - - public void samplingInHour() { - synchronized (this.csListDay) { - this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); - if (this.csListDay.size() > 25) { - this.csListDay.removeFirst(); - } - } - } - - public void printAtMinutes() { - StatsSnapshot ss = computeStatsData(this.csListMinute); - log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f", - this.statsName, - this.statsKey, - ss.getSum(), - ss.getTps(), - ss.getAvgpt())); - } - - public void printAtHour() { - StatsSnapshot ss = computeStatsData(this.csListHour); - log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f", - this.statsName, - this.statsKey, - ss.getSum(), - ss.getTps(), - ss.getAvgpt())); - } - - public void printAtDay() { - StatsSnapshot ss = computeStatsData(this.csListDay); - log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f", - this.statsName, - this.statsKey, - ss.getSum(), - ss.getTps(), - ss.getAvgpt())); - } - - public AtomicLong getValue() { - return value; - } - - - public String getStatsKey() { - return statsKey; - } - - - public String getStatsName() { - return statsName; - } - - - public AtomicLong getTimes() { - return times; - } -} - - -class CallSnapshot { - private final long timestamp; - private final long times; - - private final long value; - - - public CallSnapshot(long timestamp, long times, long value) { - super(); - this.timestamp = timestamp; - this.times = times; - this.value = value; - } - - - public long getTimestamp() { - return timestamp; - } - - - public long getTimes() { - return times; - } - - - public long getValue() { - return value; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java deleted file mode 100644 index 8a2b2a1..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.stats; - -import com.alibaba.rocketmq.common.UtilAll; -import org.slf4j.Logger; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -public class StatsItemSet { - private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = - new ConcurrentHashMap<String, StatsItem>(128); - - private final String statsName; - private final ScheduledExecutorService scheduledExecutorService; - private final Logger log; - - - public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) { - this.statsName = statsName; - this.scheduledExecutorService = scheduledExecutorService; - this.log = log; - this.init(); - } - - public void init() { - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - samplingInSeconds(); - } catch (Throwable e) { - } - } - }, 0, 10, TimeUnit.SECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - samplingInMinutes(); - } catch (Throwable e) { - } - } - }, 0, 10, TimeUnit.MINUTES); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - samplingInHour(); - } catch (Throwable e) { - } - } - }, 0, 1, TimeUnit.HOURS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtMinutes(); - } catch (Throwable e) { - } - } - }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtHour(); - } catch (Throwable e) { - } - } - }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS); - - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - printAtDay(); - } catch (Throwable e) { - } - } - }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); - } - - private void samplingInSeconds() { - Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, StatsItem> next = it.next(); - next.getValue().samplingInSeconds(); - } - } - - private void samplingInMinutes() { - Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, StatsItem> next = it.next(); - next.getValue().samplingInMinutes(); - } - } - - private void samplingInHour() { - Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, StatsItem> next = it.next(); - next.getValue().samplingInHour(); - } - } - - private void printAtMinutes() { - Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, StatsItem> next = it.next(); - next.getValue().printAtMinutes(); - } - } - - private void printAtHour() { - Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, StatsItem> next = it.next(); - next.getValue().printAtHour(); - } - } - - private void printAtDay() { - Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, StatsItem> next = it.next(); - next.getValue().printAtDay(); - } - } - - public void addValue(final String statsKey, final int incValue, final int incTimes) { - StatsItem statsItem = this.getAndCreateStatsItem(statsKey); - statsItem.getValue().addAndGet(incValue); - statsItem.getTimes().addAndGet(incTimes); - } - - public StatsItem getAndCreateStatsItem(final String statsKey) { - StatsItem statsItem = this.statsItemTable.get(statsKey); - if (null == statsItem) { - statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); - StatsItem prev = this.statsItemTable.put(statsKey, statsItem); - - if (null == prev) { - - // statsItem.init(); - } - } - - return statsItem; - } - - public StatsSnapshot getStatsDataInMinute(final String statsKey) { - StatsItem statsItem = this.statsItemTable.get(statsKey); - if (null != statsItem) { - return statsItem.getStatsDataInMinute(); - } - return new StatsSnapshot(); - } - - public StatsSnapshot getStatsDataInHour(final String statsKey) { - StatsItem statsItem = this.statsItemTable.get(statsKey); - if (null != statsItem) { - return statsItem.getStatsDataInHour(); - } - return new StatsSnapshot(); - } - - public StatsSnapshot getStatsDataInDay(final String statsKey) { - StatsItem statsItem = this.statsItemTable.get(statsKey); - if (null != statsItem) { - return statsItem.getStatsDataInDay(); - } - return new StatsSnapshot(); - } - - public StatsItem getStatsItem(final String statsKey) { - return this.statsItemTable.get(statsKey); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java b/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java deleted file mode 100644 index 4092a2b..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.stats; - -public class StatsSnapshot { - private long sum; - private double tps; - private double avgpt; - - - public long getSum() { - return sum; - } - - - public void setSum(long sum) { - this.sum = sum; - } - - - public double getTps() { - return tps; - } - - - public void setTps(double tps) { - this.tps = tps; - } - - - public double getAvgpt() { - return avgpt; - } - - - public void setAvgpt(double avgpt) { - this.avgpt = avgpt; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java deleted file mode 100644 index cf8baf2..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.subscription; - -import com.alibaba.rocketmq.common.MixAll; - - -/** - * @author shijia.wxr - */ -public class SubscriptionGroupConfig { - - private String groupName; - - private boolean consumeEnable = true; - private boolean consumeFromMinEnable = true; - - private boolean consumeBroadcastEnable = true; - - private int retryQueueNums = 1; - - private int retryMaxTimes = 16; - - private long brokerId = MixAll.MASTER_ID; - - private long whichBrokerWhenConsumeSlowly = 1; - - private boolean notifyConsumerIdsChangedEnable = true; - - - public String getGroupName() { - return groupName; - } - - - public void setGroupName(String groupName) { - this.groupName = groupName; - } - - - public boolean isConsumeEnable() { - return consumeEnable; - } - - - public void setConsumeEnable(boolean consumeEnable) { - this.consumeEnable = consumeEnable; - } - - - public boolean isConsumeFromMinEnable() { - return consumeFromMinEnable; - } - - - public void setConsumeFromMinEnable(boolean consumeFromMinEnable) { - this.consumeFromMinEnable = consumeFromMinEnable; - } - - - public boolean isConsumeBroadcastEnable() { - return consumeBroadcastEnable; - } - - - public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) { - this.consumeBroadcastEnable = consumeBroadcastEnable; - } - - - public int getRetryQueueNums() { - return retryQueueNums; - } - - - public void setRetryQueueNums(int retryQueueNums) { - this.retryQueueNums = retryQueueNums; - } - - - public int getRetryMaxTimes() { - return retryMaxTimes; - } - - - public void setRetryMaxTimes(int retryMaxTimes) { - this.retryMaxTimes = retryMaxTimes; - } - - - public long getBrokerId() { - return brokerId; - } - - - public void setBrokerId(long brokerId) { - this.brokerId = brokerId; - } - - - public long getWhichBrokerWhenConsumeSlowly() { - return whichBrokerWhenConsumeSlowly; - } - - - public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) { - this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly; - } - - public boolean isNotifyConsumerIdsChangedEnable() { - return notifyConsumerIdsChangedEnable; - } - - public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) { - this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (brokerId ^ (brokerId >>> 32)); - result = prime * result + (consumeBroadcastEnable ? 1231 : 1237); - result = prime * result + (consumeEnable ? 1231 : 1237); - result = prime * result + (consumeFromMinEnable ? 1231 : 1237); - result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237); - result = prime * result + ((groupName == null) ? 0 : groupName.hashCode()); - result = prime * result + retryMaxTimes; - result = prime * result + retryQueueNums; - result = - prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32)); - return result; - } - - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj; - if (brokerId != other.brokerId) - return false; - if (consumeBroadcastEnable != other.consumeBroadcastEnable) - return false; - if (consumeEnable != other.consumeEnable) - return false; - if (consumeFromMinEnable != other.consumeFromMinEnable) - return false; - if (groupName == null) { - if (other.groupName != null) - return false; - } else if (!groupName.equals(other.groupName)) - return false; - if (retryMaxTimes != other.retryMaxTimes) - return false; - if (retryQueueNums != other.retryQueueNums) - return false; - if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly) - return false; - if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable) - return false; - return true; - } - - - @Override - public String toString() { - return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable - + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable=" - + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes=" - + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly=" - + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable=" - + notifyConsumerIdsChangedEnable + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java deleted file mode 100644 index 2f9d057..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.sysflag; - -/** - * @author shijia.wxr - */ -public class MessageSysFlag { - public final static int COMPRESSED_FLAG = 0x1; - public final static int MULTI_TAGS_FLAG = 0x1 << 1; - public final static int TRANSACTION_NOT_TYPE = 0; - public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; - public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; - public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; - - - public static int getTransactionValue(final int flag) { - return flag & TRANSACTION_ROLLBACK_TYPE; - } - - - public static int resetTransactionValue(final int flag, final int type) { - return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type; - } - - - public static int clearCompressedFlag(final int flag) { - return flag & (~COMPRESSED_FLAG); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java deleted file mode 100644 index d0f7287..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.sysflag; - -/** - * @author shijia.wxr - */ -public class PullSysFlag { - private final static int FLAG_COMMIT_OFFSET = 0x1 << 0; - private final static int FLAG_SUSPEND = 0x1 << 1; - private final static int FLAG_SUBSCRIPTION = 0x1 << 2; - private final static int FLAG_CLASS_FILTER = 0x1 << 3; - - - public static int buildSysFlag(final boolean commitOffset, final boolean suspend, - final boolean subscription, final boolean classFilter) { - int flag = 0; - - if (commitOffset) { - flag |= FLAG_COMMIT_OFFSET; - } - - if (suspend) { - flag |= FLAG_SUSPEND; - } - - if (subscription) { - flag |= FLAG_SUBSCRIPTION; - } - - if (classFilter) { - flag |= FLAG_CLASS_FILTER; - } - - return flag; - } - - - public static int clearCommitOffsetFlag(final int sysFlag) { - return sysFlag & (~FLAG_COMMIT_OFFSET); - } - - - public static boolean hasCommitOffsetFlag(final int sysFlag) { - return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET; - } - - - public static boolean hasSuspendFlag(final int sysFlag) { - return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND; - } - - - public static boolean hasSubscriptionFlag(final int sysFlag) { - return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION; - } - - - public static boolean hasClassFilterFlag(final int sysFlag) { - return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java deleted file mode 100644 index 65e3115..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.sysflag; - -/** - * @author manhong.yqd - */ -public class SubscriptionSysFlag { - - private final static int FLAG_UNIT = 0x1 << 0; - - - public static int buildSysFlag(final boolean unit) { - int sysFlag = 0; - - if (unit) { - sysFlag |= FLAG_UNIT; - } - - return sysFlag; - } - - - public static int setUnitFlag(final int sysFlag) { - return sysFlag | FLAG_UNIT; - } - - - public static int clearUnitFlag(final int sysFlag) { - return sysFlag & (~FLAG_UNIT); - } - - - public static boolean hasUnitFlag(final int sysFlag) { - return (sysFlag & FLAG_UNIT) == FLAG_UNIT; - } - - - public static void main(String[] args) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java deleted file mode 100644 index 90d48f4..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.common.sysflag; - -/** - - * - * @author manhong.yqd - * - */ -public class TopicSysFlag { - - private final static int FLAG_UNIT = 0x1 << 0; - - private final static int FLAG_UNIT_SUB = 0x1 << 1; - - - public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) { - int sysFlag = 0; - - if (unit) { - sysFlag |= FLAG_UNIT; - } - - if (hasUnitSub) { - sysFlag |= FLAG_UNIT_SUB; - } - - return sysFlag; - } - - - public static int setUnitFlag(final int sysFlag) { - return sysFlag | FLAG_UNIT; - } - - - public static int clearUnitFlag(final int sysFlag) { - return sysFlag & (~FLAG_UNIT); - } - - - public static boolean hasUnitFlag(final int sysFlag) { - return (sysFlag & FLAG_UNIT) == FLAG_UNIT; - } - - - public static int setUnitSubFlag(final int sysFlag) { - return sysFlag | FLAG_UNIT_SUB; - } - - - public static int clearUnitSubFlag(final int sysFlag) { - return sysFlag & (~FLAG_UNIT_SUB); - } - - - public static boolean hasUnitSubFlag(final int sysFlag) { - return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB; - } - - - public static void main(String[] args) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java deleted file mode 100644 index 444928f..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.utils; - -import io.netty.channel.Channel; - -import java.net.InetAddress; -import java.net.InetSocketAddress; - -public class ChannelUtil { - public static String getRemoteIp(Channel channel) { - InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress(); - if (inetSocketAddress == null) { - return ""; - } - final InetAddress inetAddr = inetSocketAddress.getAddress(); - return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java deleted file mode 100755 index dadac46..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.utils; - -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.MixAll; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.HttpURLConnection; -import java.net.URL; -import java.net.URLEncoder; -import java.util.Iterator; -import java.util.List; - - -public class HttpTinyClient { - - static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues, - String encoding, long readTimeoutMs) throws IOException { - String encodedContent = encodingParams(paramValues, encoding); - url += (null == encodedContent) ? "" : ("?" + encodedContent); - - HttpURLConnection conn = null; - try { - conn = (HttpURLConnection) new URL(url).openConnection(); - conn.setRequestMethod("GET"); - conn.setConnectTimeout((int) readTimeoutMs); - conn.setReadTimeout((int) readTimeoutMs); - setHeaders(conn, headers, encoding); - - conn.connect(); - int respCode = conn.getResponseCode(); - String resp = null; - - if (HttpURLConnection.HTTP_OK == respCode) { - resp = IOTinyUtils.toString(conn.getInputStream(), encoding); - } else { - resp = IOTinyUtils.toString(conn.getErrorStream(), encoding); - } - return new HttpResult(respCode, resp); - } finally { - if (conn != null) { - conn.disconnect(); - } - } - } - - static private String encodingParams(List<String> paramValues, String encoding) - throws UnsupportedEncodingException { - StringBuilder sb = new StringBuilder(); - if (null == paramValues) { - return null; - } - - for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) { - sb.append(iter.next()).append("="); - sb.append(URLEncoder.encode(iter.next(), encoding)); - if (iter.hasNext()) { - sb.append("&"); - } - } - return sb.toString(); - } - - static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) { - if (null != headers) { - for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) { - conn.addRequestProperty(iter.next(), iter.next()); - } - } - conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); - conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding); - - - String ts = String.valueOf(System.currentTimeMillis()); - conn.addRequestProperty("Metaq-Client-RequestTS", ts); - } - - /** - - * - * @param url - * @param headers - - * @param paramValues - - * @param encoding - - * @param readTimeoutMs - - * - * @return the http response of given http post request - * - * @throws java.io.IOException - */ - static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues, - String encoding, long readTimeoutMs) throws IOException { - String encodedContent = encodingParams(paramValues, encoding); - - HttpURLConnection conn = null; - try { - conn = (HttpURLConnection) new URL(url).openConnection(); - conn.setRequestMethod("POST"); - conn.setConnectTimeout(3000); - conn.setReadTimeout((int) readTimeoutMs); - conn.setDoOutput(true); - conn.setDoInput(true); - setHeaders(conn, headers, encoding); - - conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET)); - - int respCode = conn.getResponseCode(); - String resp = null; - - if (HttpURLConnection.HTTP_OK == respCode) { - resp = IOTinyUtils.toString(conn.getInputStream(), encoding); - } else { - resp = IOTinyUtils.toString(conn.getErrorStream(), encoding); - } - return new HttpResult(respCode, resp); - } finally { - if (null != conn) { - conn.disconnect(); - } - } - } - - static public class HttpResult { - final public int code; - final public String content; - - - public HttpResult(int code, String content) { - this.code = code; - this.content = content; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java deleted file mode 100644 index ced2fae..0000000 --- a/common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.common.utils; - -import com.alibaba.rocketmq.remoting.common.RemotingHelper; - -import java.io.*; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.List; - - -/** - * @author manhong.yqd - */ -public class IOTinyUtils { - - static public String toString(InputStream input, String encoding) throws IOException { - return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader( - input, encoding)); - } - - - static public String toString(Reader reader) throws IOException { - CharArrayWriter sw = new CharArrayWriter(); - copy(reader, sw); - return sw.toString(); - } - - - static public long copy(Reader input, Writer output) throws IOException { - char[] buffer = new char[1 << 12]; - long count = 0; - for (int n = 0; (n = input.read(buffer)) >= 0; ) { - output.write(buffer, 0, n); - count += n; - } - return count; - } - - - /** - - */ - static public List<String> readLines(Reader input) throws IOException { - BufferedReader reader = toBufferedReader(input); - List<String> list = new ArrayList<String>(); - String line = null; - for (;;) { - line = reader.readLine(); - if (null != line) { - list.add(line); - } else { - break; - } - } - return list; - } - - - static private BufferedReader toBufferedReader(Reader reader) { - return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader); - } - - - static public void copyFile(String source, String target) throws IOException { - File sf = new File(source); - if (!sf.exists()) { - throw new IllegalArgumentException("source file does not exist."); - } - File tf = new File(target); - tf.getParentFile().mkdirs(); - if (!tf.exists() && !tf.createNewFile()) { - throw new RuntimeException("failed to create target file."); - } - - FileChannel sc = null; - FileChannel tc = null; - try { - tc = new FileOutputStream(tf).getChannel(); - sc = new FileInputStream(sf).getChannel(); - sc.transferTo(0, sc.size(), tc); - } finally { - if (null != sc) { - sc.close(); - } - if (null != tc) { - tc.close(); - } - } - } - - - public static void delete(File fileOrDir) throws IOException { - if (fileOrDir == null) { - return; - } - - if (fileOrDir.isDirectory()) { - cleanDirectory(fileOrDir); - } - - fileOrDir.delete(); - } - - - /** - - */ - public static void cleanDirectory(File directory) throws IOException { - if (!directory.exists()) { - String message = directory + " does not exist"; - throw new IllegalArgumentException(message); - } - - if (!directory.isDirectory()) { - String message = directory + " is not a directory"; - throw new IllegalArgumentException(message); - } - - File[] files = directory.listFiles(); - if (files == null) { // null if security restricted - throw new IOException("Failed to list contents of " + directory); - } - - IOException exception = null; - for (File file : files) { - try { - delete(file); - } catch (IOException ioe) { - exception = ioe; - } - } - - if (null != exception) { - throw exception; - } - } - - - public static void writeStringToFile(File file, String data, String encoding) throws IOException { - OutputStream os = null; - try { - os = new FileOutputStream(file); - os.write(data.getBytes(encoding)); - } finally { - if (null != os) { - os.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java new file mode 100644 index 0000000..f035ed6 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -0,0 +1,549 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.remoting.common.RemotingUtil; + +import java.net.InetAddress; +import java.net.UnknownHostException; + + +/** + * @author shijia.wxr + */ +public class BrokerConfig { + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + @ImportantField + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + @ImportantField + private String brokerIP1 = RemotingUtil.getLocalAddress(); + private String brokerIP2 = RemotingUtil.getLocalAddress(); + @ImportantField + private String brokerName = localHostName(); + @ImportantField + private String brokerClusterName = "DefaultCluster"; + @ImportantField + private long brokerId = MixAll.MASTER_ID; + private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE; + private int defaultTopicQueueNums = 8; + @ImportantField + private boolean autoCreateTopicEnable = true; + + private boolean clusterTopicEnable = true; + + private boolean brokerTopicEnable = true; + @ImportantField + private boolean autoCreateSubscriptionGroup = true; + private String messageStorePlugIn = ""; + + private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; + private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int adminBrokerThreadPoolNums = 16; + private int clientManageThreadPoolNums = 32; + private int consumerManageThreadPoolNums = 32; + + private int flushConsumerOffsetInterval = 1000 * 5; + + private int flushConsumerOffsetHistoryInterval = 1000 * 60; + + @ImportantField + private boolean rejectTransactionMessage = false; + @ImportantField + private boolean fetchNamesrvAddrByAddressServer = false; + private int sendThreadPoolQueueCapacity = 10000; + private int pullThreadPoolQueueCapacity = 100000; + private int clientManagerThreadPoolQueueCapacity = 1000000; + private int consumerManagerThreadPoolQueueCapacity = 1000000; + + private int filterServerNums = 0; + + private boolean longPollingEnable = true; + + private long shortPollingTimeMills = 1000; + + private boolean notifyConsumerIdsChangedEnable = true; + + private boolean highSpeedMode = false; + + private boolean commercialEnable = true; + private int commercialTimerCount = 1; + private int commercialTransCount = 1; + private int commercialBigCount = 1; + private int commercialBaseCount = 1; + + private boolean transferMsgByHeap = true; + private int maxDelayTime = 40; + + + private String regionId = MixAll.DEFAULT_TRACE_REGION_ID; + private int registerBrokerTimeoutMills = 6000; + + private boolean slaveReadEnable = false; + + private boolean disableConsumeIfConsumerReadSlowly = false; + private long consumerFallbehindThreshold = 1024 * 1024 * 1024 * 16; + + private long waitTimeMillsInSendQueue = 200; + + private long startAcceptSendRequestTimeStamp = 0L; + + private boolean traceOn = true; + + public boolean isTraceOn() { + return traceOn; + } + + public void setTraceOn(final boolean traceOn) { + this.traceOn = traceOn; + } + + public long getStartAcceptSendRequestTimeStamp() { + return startAcceptSendRequestTimeStamp; + } + + public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) { + this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp; + } + + public long getWaitTimeMillsInSendQueue() { + return waitTimeMillsInSendQueue; + } + + public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) { + this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue; + } + + public long getConsumerFallbehindThreshold() { + return consumerFallbehindThreshold; + } + + public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) { + this.consumerFallbehindThreshold = consumerFallbehindThreshold; + } + + public boolean isDisableConsumeIfConsumerReadSlowly() { + return disableConsumeIfConsumerReadSlowly; + } + + public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) { + this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly; + } + + public boolean isSlaveReadEnable() { + return slaveReadEnable; + } + + public void setSlaveReadEnable(final boolean slaveReadEnable) { + this.slaveReadEnable = slaveReadEnable; + } + + public static String localHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return "DEFAULT_BROKER"; + } + + public int getRegisterBrokerTimeoutMills() { + return registerBrokerTimeoutMills; + } + + public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) { + this.registerBrokerTimeoutMills = registerBrokerTimeoutMills; + } + + public String getRegionId() { + return regionId; + } + + public void setRegionId(final String regionId) { + this.regionId = regionId; + } + + public boolean isTransferMsgByHeap() { + return transferMsgByHeap; + } + + public void setTransferMsgByHeap(final boolean transferMsgByHeap) { + this.transferMsgByHeap = transferMsgByHeap; + } + + public String getMessageStorePlugIn() { + return messageStorePlugIn; + } + + public void setMessageStorePlugIn(String messageStorePlugIn) { + this.messageStorePlugIn = messageStorePlugIn; + } + + public boolean isHighSpeedMode() { + return highSpeedMode; + } + + + public void setHighSpeedMode(final boolean highSpeedMode) { + this.highSpeedMode = highSpeedMode; + } + + + public String getRocketmqHome() { + return rocketmqHome; + } + + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public int getBrokerPermission() { + return brokerPermission; + } + + + public void setBrokerPermission(int brokerPermission) { + this.brokerPermission = brokerPermission; + } + + + public int getDefaultTopicQueueNums() { + return defaultTopicQueueNums; + } + + + public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { + this.defaultTopicQueueNums = defaultTopicQueueNums; + } + + + public boolean isAutoCreateTopicEnable() { + return autoCreateTopicEnable; + } + + + public void setAutoCreateTopicEnable(boolean autoCreateTopic) { + this.autoCreateTopicEnable = autoCreateTopic; + } + + + public String getBrokerClusterName() { + return brokerClusterName; + } + + + public void setBrokerClusterName(String brokerClusterName) { + this.brokerClusterName = brokerClusterName; + } + + + public String getBrokerIP1() { + return brokerIP1; + } + + + public void setBrokerIP1(String brokerIP1) { + this.brokerIP1 = brokerIP1; + } + + + public String getBrokerIP2() { + return brokerIP2; + } + + + public void setBrokerIP2(String brokerIP2) { + this.brokerIP2 = brokerIP2; + } + + public int getSendMessageThreadPoolNums() { + return sendMessageThreadPoolNums; + } + + public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) { + this.sendMessageThreadPoolNums = sendMessageThreadPoolNums; + } + + + public int getPullMessageThreadPoolNums() { + return pullMessageThreadPoolNums; + } + + + public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) { + this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; + } + + + public int getAdminBrokerThreadPoolNums() { + return adminBrokerThreadPoolNums; + } + + + public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) { + this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums; + } + + + public int getFlushConsumerOffsetInterval() { + return flushConsumerOffsetInterval; + } + + + public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) { + this.flushConsumerOffsetInterval = flushConsumerOffsetInterval; + } + + + public int getFlushConsumerOffsetHistoryInterval() { + return flushConsumerOffsetHistoryInterval; + } + + + public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) { + this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval; + } + + + public boolean isClusterTopicEnable() { + return clusterTopicEnable; + } + + + public void setClusterTopicEnable(boolean clusterTopicEnable) { + this.clusterTopicEnable = clusterTopicEnable; + } + + + public String getNamesrvAddr() { + return namesrvAddr; + } + + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + + public long getBrokerId() { + return brokerId; + } + + + public void setBrokerId(long brokerId) { + this.brokerId = brokerId; + } + + + public boolean isAutoCreateSubscriptionGroup() { + return autoCreateSubscriptionGroup; + } + + + public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) { + this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup; + } + + + public boolean isRejectTransactionMessage() { + return rejectTransactionMessage; + } + + + public void setRejectTransactionMessage(boolean rejectTransactionMessage) { + this.rejectTransactionMessage = rejectTransactionMessage; + } + + + public boolean isFetchNamesrvAddrByAddressServer() { + return fetchNamesrvAddrByAddressServer; + } + + + public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) { + this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer; + } + + + public int getSendThreadPoolQueueCapacity() { + return sendThreadPoolQueueCapacity; + } + + + public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) { + this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity; + } + + + public int getPullThreadPoolQueueCapacity() { + return pullThreadPoolQueueCapacity; + } + + + public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) { + this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; + } + + + public boolean isBrokerTopicEnable() { + return brokerTopicEnable; + } + + + public void setBrokerTopicEnable(boolean brokerTopicEnable) { + this.brokerTopicEnable = brokerTopicEnable; + } + + + public int getFilterServerNums() { + return filterServerNums; + } + + + public void setFilterServerNums(int filterServerNums) { + this.filterServerNums = filterServerNums; + } + + + public boolean isLongPollingEnable() { + return longPollingEnable; + } + + + public void setLongPollingEnable(boolean longPollingEnable) { + this.longPollingEnable = longPollingEnable; + } + + + public boolean isNotifyConsumerIdsChangedEnable() { + return notifyConsumerIdsChangedEnable; + } + + + public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) { + this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable; + } + + + public long getShortPollingTimeMills() { + return shortPollingTimeMills; + } + + + public void setShortPollingTimeMills(long shortPollingTimeMills) { + this.shortPollingTimeMills = shortPollingTimeMills; + } + + + public int getClientManageThreadPoolNums() { + return clientManageThreadPoolNums; + } + + + public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) { + this.clientManageThreadPoolNums = clientManageThreadPoolNums; + } + + + public boolean isCommercialEnable() { + return commercialEnable; + } + + + public void setCommercialEnable(final boolean commercialEnable) { + this.commercialEnable = commercialEnable; + } + + public int getCommercialTimerCount() { + return commercialTimerCount; + } + + public void setCommercialTimerCount(final int commercialTimerCount) { + this.commercialTimerCount = commercialTimerCount; + } + + public int getCommercialTransCount() { + return commercialTransCount; + } + + public void setCommercialTransCount(final int commercialTransCount) { + this.commercialTransCount = commercialTransCount; + } + + public int getCommercialBigCount() { + return commercialBigCount; + } + + public void setCommercialBigCount(final int commercialBigCount) { + this.commercialBigCount = commercialBigCount; + } + + public int getMaxDelayTime() { + return maxDelayTime; + } + + + public void setMaxDelayTime(final int maxDelayTime) { + this.maxDelayTime = maxDelayTime; + } + + public int getClientManagerThreadPoolQueueCapacity() { + return clientManagerThreadPoolQueueCapacity; + } + + public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) { + this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity; + } + + public int getConsumerManagerThreadPoolQueueCapacity() { + return consumerManagerThreadPoolQueueCapacity; + } + + public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) { + this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity; + } + + public int getConsumerManageThreadPoolNums() { + return consumerManageThreadPoolNums; + } + + public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) { + this.consumerManageThreadPoolNums = consumerManageThreadPoolNums; + } + + public int getCommercialBaseCount() { + return commercialBaseCount; + } + + public void setCommercialBaseCount(int commercialBaseCount) { + this.commercialBaseCount = commercialBaseCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java new file mode 100644 index 0000000..fc73b71 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class BrokerConfigSingleton { + private static AtomicBoolean isInit = new AtomicBoolean(); + private static BrokerConfig brokerConfig; + + public static BrokerConfig getBrokerConfig() { + if (brokerConfig == null) { + throw new IllegalArgumentException("brokerConfig Cannot be null !"); + } + return brokerConfig; + } + + public static void setBrokerConfig(BrokerConfig brokerConfig) { + if (!isInit.compareAndSet(false, true)) { + throw new IllegalArgumentException("broker config have inited !"); + } + BrokerConfigSingleton.brokerConfig = brokerConfig; + } +}
