http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java new file mode 100644 index 0000000..c922c8d --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java @@ -0,0 +1,1748 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.*; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.running.RunningStats; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.store.config.BrokerRole; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import com.alibaba.rocketmq.store.config.StorePathConfigHelper; +import com.alibaba.rocketmq.store.ha.HAService; +import com.alibaba.rocketmq.store.index.IndexService; +import com.alibaba.rocketmq.store.index.QueryOffsetResult; +import com.alibaba.rocketmq.store.schedule.ScheduleMessageService; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static com.alibaba.rocketmq.store.config.BrokerRole.SLAVE; + + +/** + * @author shijia.wxr + */ +public class DefaultMessageStore implements MessageStore { + private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + + private final MessageFilter messageFilter = new DefaultMessageFilter(); + + private final MessageStoreConfig messageStoreConfig; + // CommitLog + private final CommitLog commitLog; + + private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; + + private final FlushConsumeQueueService flushConsumeQueueService; + + private final CleanCommitLogService cleanCommitLogService; + + private final CleanConsumeQueueService cleanConsumeQueueService; + + private final IndexService indexService; + + private final AllocateMappedFileService allocateMappedFileService; + + private final ReputMessageService reputMessageService; + + private final HAService haService; + + private final ScheduleMessageService scheduleMessageService; + + private final StoreStatsService storeStatsService; + + private final TransientStorePool transientStorePool; + + private final RunningFlags runningFlags = new RunningFlags(); + private final SystemClock systemClock = new SystemClock(); + + private final ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); + private final BrokerStatsManager brokerStatsManager; + private final MessageArrivingListener messageArrivingListener; + private final BrokerConfig brokerConfig; + + private volatile boolean shutdown = true; + + private StoreCheckpoint storeCheckpoint; + + private AtomicLong printTimes = new AtomicLong(0); + + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, + final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { + this.messageArrivingListener = messageArrivingListener; + this.brokerConfig = brokerConfig; + this.messageStoreConfig = messageStoreConfig; + this.brokerStatsManager = brokerStatsManager; + this.allocateMappedFileService = new AllocateMappedFileService(this); + this.commitLog = new CommitLog(this); + this.consumeQueueTable = new ConcurrentHashMap<>(32); + + this.flushConsumeQueueService = new FlushConsumeQueueService(); + this.cleanCommitLogService = new CleanCommitLogService(); + this.cleanConsumeQueueService = new CleanConsumeQueueService(); + this.storeStatsService = new StoreStatsService(); + this.indexService = new IndexService(this); + this.haService = new HAService(this); + + this.reputMessageService = new ReputMessageService(); + + this.scheduleMessageService = new ScheduleMessageService(this); + + this.transientStorePool = new TransientStorePool(messageStoreConfig); + + if (messageStoreConfig.isTransientStorePoolEnable()) { + this.transientStorePool.init(); + } + + + this.allocateMappedFileService.start(); + + this.indexService.start(); + } + + + public void truncateDirtyLogicFiles(long phyOffset) { + ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; + + for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { + for (ConsumeQueue logic : maps.values()) { + logic.truncateDirtyLogicFiles(phyOffset); + } + } + } + + + /** + * @throws IOException + */ + public boolean load() { + boolean result = true; + + try { + boolean lastExitOK = !this.isTempFileExist(); + log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); + + if (null != scheduleMessageService) { + result = result && this.scheduleMessageService.load(); + } + + // load Commit Log + result = result && this.commitLog.load(); + + // load Consume Queue + result = result && this.loadConsumeQueue(); + + if (result) { + this.storeCheckpoint = + new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); + + this.indexService.load(lastExitOK); + + + this.recover(lastExitOK); + + log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); + } + } catch (Exception e) { + log.error("load exception", e); + result = false; + } + + if (!result) { + this.allocateMappedFileService.shutdown(); + } + + return result; + } + + /** + * @throws Exception + */ + public void start() throws Exception { + this.flushConsumeQueueService.start(); + this.commitLog.start(); + this.storeStatsService.start(); + + + if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { + this.scheduleMessageService.start(); + } + + if (this.getMessageStoreConfig().isDuplicationEnable()) { + this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); + } else { + this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); + } + this.reputMessageService.start(); + + this.haService.start(); + + this.createTempFile(); + this.addScheduleTask(); + this.shutdown = false; + } + + /** + + */ + public void shutdown() { + if (!this.shutdown) { + this.shutdown = true; + + this.scheduledExecutorService.shutdown(); + + try { + + Thread.sleep(1000 * 3); + } catch (InterruptedException e) { + log.error("shutdown Exception, ", e); + } + + if (this.scheduleMessageService != null) { + this.scheduleMessageService.shutdown(); + } + + this.haService.shutdown(); + + this.storeStatsService.shutdown(); + this.indexService.shutdown(); + this.commitLog.shutdown(); + this.reputMessageService.shutdown(); + this.flushConsumeQueueService.shutdown(); + this.allocateMappedFileService.shutdown(); + this.storeCheckpoint.flush(); + this.storeCheckpoint.shutdown(); + + if (this.runningFlags.isWriteable()) { + this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); + } else { + log.warn("the store may be wrong, so shutdown abnormally, and keep abort file."); + } + } + + this.transientStorePool.destroy(); + } + + public void destroy() { + this.destroyLogics(); + this.commitLog.destroy(); + this.indexService.destroy(); + this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); + this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); + } + + public void destroyLogics() { + for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { + for (ConsumeQueue logic : maps.values()) { + logic.destroy(); + } + } + } + + public PutMessageResult putMessage(MessageExtBrokerInner msg) { + if (this.shutdown) { + log.warn("message store has shutdown, so putMessage is forbidden"); + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } + + if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { + log.warn("message store is slave mode, so putMessage is forbidden "); + } + + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } + + if (!this.runningFlags.isWriteable()) { + long value = this.printTimes.getAndIncrement(); + if ((value % 50000) == 0) { + log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); + } + + return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); + } else { + this.printTimes.set(0); + } + + + if (msg.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + msg.getTopic().length()); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + + if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); + return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + } + + + if (this.isOSPageCacheBusy()) { + return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); + } + + long beginTime = this.getSystemClock().now(); + PutMessageResult result = this.commitLog.putMessage(msg); + + long eclipseTime = this.getSystemClock().now() - beginTime; + if (eclipseTime > 500) { + log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); + } + this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); + + if (null == result || !result.isOk()) { + this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + } + + return result; + } + + @Override + public boolean isOSPageCacheBusy() { + long begin = this.getCommitLog().getBeginTimeInLock(); + long diff = this.systemClock.now() - begin; + + if (diff < 10000000 // + && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) { + return true; + } + + return false; + } + + @Override + public long lockTimeMills() { + return this.commitLog.lockTimeMills(); + } + + public SystemClock getSystemClock() { + return systemClock; + } + + public CommitLog getCommitLog() { + return commitLog; + } + + public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, + final SubscriptionData subscriptionData) { + if (this.shutdown) { + log.warn("message store has shutdown, so getMessage is forbidden"); + return null; + } + + if (!this.runningFlags.isReadable()) { + log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); + return null; + } + + long beginTime = this.getSystemClock().now(); + + + GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; + long nextBeginOffset = offset; + long minOffset = 0; + long maxOffset = 0; + + GetMessageResult getResult = new GetMessageResult(); + + + final long maxOffsetPy = this.commitLog.getMaxOffset(); + + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); + if (consumeQueue != null) { + minOffset = consumeQueue.getMinOffsetInQuque(); + maxOffset = consumeQueue.getMaxOffsetInQuque(); + + if (maxOffset == 0) { + status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; + nextBeginOffset = nextOffsetCorrection(offset, 0); + } else if (offset < minOffset) { + status = GetMessageStatus.OFFSET_TOO_SMALL; + nextBeginOffset = nextOffsetCorrection(offset, minOffset); + } else if (offset == maxOffset) { + status = GetMessageStatus.OFFSET_OVERFLOW_ONE; + nextBeginOffset = nextOffsetCorrection(offset, offset); + } else if (offset > maxOffset) { + status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; + if (0 == minOffset) { + nextBeginOffset = nextOffsetCorrection(offset, minOffset); + } else { + nextBeginOffset = nextOffsetCorrection(offset, maxOffset); + } + } else { + SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); + if (bufferConsumeQueue != null) { + try { + status = GetMessageStatus.NO_MATCHED_MESSAGE; + + long nextPhyFileStartOffset = Long.MIN_VALUE; + long maxPhyOffsetPulling = 0; + + int i = 0; + final int maxFilterMessageCount = 16000; + final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); + for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); + int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); + long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); + + maxPhyOffsetPulling = offsetPy; + + + if (nextPhyFileStartOffset != Long.MIN_VALUE) { + if (offsetPy < nextPhyFileStartOffset) + continue; + } + + + boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); + + if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), + isInDisk)) { + break; + } + + + if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) { + SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); + if (selectResult != null) { + this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); + getResult.addMessage(selectResult); + status = GetMessageStatus.FOUND; + nextPhyFileStartOffset = Long.MIN_VALUE; + } else { + if (getResult.getBufferTotalSize() == 0) { + status = GetMessageStatus.MESSAGE_WAS_REMOVING; + } + + + nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); + } + } else { + if (getResult.getBufferTotalSize() == 0) { + status = GetMessageStatus.NO_MATCHED_MESSAGE; + } + + if (log.isDebugEnabled()) { + log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode); + } + } + } + + + if (diskFallRecorded) { + long fallBehind = maxOffsetPy - maxPhyOffsetPulling; + brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); + } + + nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); + + + long diff = maxOffsetPy - maxPhyOffsetPulling; + long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE + * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); + getResult.setSuggestPullingFromSlave(diff > memory); + } finally { + + bufferConsumeQueue.release(); + } + } else { + status = GetMessageStatus.OFFSET_FOUND_NULL; + nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); + log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " + + maxOffset + ", but access logic queue failed."); + } + } + } else { + status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; + nextBeginOffset = nextOffsetCorrection(offset, 0); + } + + if (GetMessageStatus.FOUND == status) { + this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); + } else { + this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); + } + long eclipseTime = this.getSystemClock().now() - beginTime; + this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime); + + getResult.setStatus(status); + getResult.setNextBeginOffset(nextBeginOffset); + getResult.setMaxOffset(maxOffset); + getResult.setMinOffset(minOffset); + return getResult; + } + + /** + + */ + public long getMaxOffsetInQuque(String topic, int queueId) { + ConsumeQueue logic = this.findConsumeQueue(topic, queueId); + if (logic != null) { + long offset = logic.getMaxOffsetInQuque(); + return offset; + } + + return 0; + } + + /** + + */ + public long getMinOffsetInQuque(String topic, int queueId) { + ConsumeQueue logic = this.findConsumeQueue(topic, queueId); + if (logic != null) { + return logic.getMinOffsetInQuque(); + } + + return -1; + } + + @Override + public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) { + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); + if (consumeQueue != null) { + SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset); + if (bufferConsumeQueue != null) { + try { + long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); + return offsetPy; + } finally { + bufferConsumeQueue.release(); + } + } + } + + return 0; + } + + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + ConsumeQueue logic = this.findConsumeQueue(topic, queueId); + if (logic != null) { + return logic.getOffsetInQueueByTime(timestamp); + } + + return 0; + } + + public MessageExt lookMessageByOffset(long commitLogOffset) { + SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); + if (null != sbr) { + try { + // 1 TOTALSIZE + int size = sbr.getByteBuffer().getInt(); + return lookMessageByOffset(commitLogOffset, size); + } finally { + sbr.release(); + } + } + + return null; + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { + SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); + if (null != sbr) { + try { + // 1 TOTALSIZE + int size = sbr.getByteBuffer().getInt(); + return this.commitLog.getMessage(commitLogOffset, size); + } finally { + sbr.release(); + } + } + + return null; + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { + return this.commitLog.getMessage(commitLogOffset, msgSize); + } + + public String getRunningDataInfo() { + return this.storeStatsService.toString(); + } + + @Override + public HashMap<String, String> getRuntimeInfo() { + HashMap<String, String> result = this.storeStatsService.getRuntimeInfo(); + + { + String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); + + } + + + { + + String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); + double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); + result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); + } + + + { + if (this.scheduleMessageService != null) { + this.scheduleMessageService.buildRunningStats(result); + } + } + + result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset())); + result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset())); + + return result; + } + + @Override + public long getMaxPhyOffset() { + return this.commitLog.getMaxOffset(); + } + + @Override + public long getMinPhyOffset() { + return this.commitLog.getMinOffset(); + } + + @Override + public long getEarliestMessageTime(String topic, int queueId) { + ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); + if (logicQueue != null) { + long minLogicOffset = logicQueue.getMinLogicOffset(); + + SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE); + if (result != null) { + try { + final long phyOffset = result.getByteBuffer().getLong(); + final int size = result.getByteBuffer().getInt(); + long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); + return storeTime; + } catch (Exception e) { + } finally { + result.release(); + } + } + } + + return -1; + } + + @Override + public long getEarliestMessageTime() { + final long minPhyOffset = this.getMinPhyOffset(); + final int size = this.messageStoreConfig.getMaxMessageSize() * 2; + return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size); + } + + @Override + public long getMessageStoreTimeStamp(String topic, int queueId, long offset) { + ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); + if (logicQueue != null) { + SelectMappedBufferResult result = logicQueue.getIndexBuffer(offset); + if (result != null) { + try { + final long phyOffset = result.getByteBuffer().getLong(); + final int size = result.getByteBuffer().getInt(); + long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); + return storeTime; + } catch (Exception e) { + } finally { + result.release(); + } + } + } + + return -1; + } + + @Override + public long getMessageTotalInQueue(String topic, int queueId) { + ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); + if (logicQueue != null) { + return logicQueue.getMessageTotalInQueue(); + } + + return -1; + } + + @Override + public SelectMappedBufferResult getCommitLogData(final long offset) { + if (this.shutdown) { + log.warn("message store has shutdown, so getPhyQueueData is forbidden"); + return null; + } + + return this.commitLog.getData(offset); + } + + @Override + public boolean appendToCommitLog(long startOffset, byte[] data) { + if (this.shutdown) { + log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); + return false; + } + + boolean result = this.commitLog.appendData(startOffset, data); + if (result) { + this.reputMessageService.wakeup(); + } else { + log.error("appendToPhyQueue failed " + startOffset + " " + data.length); + } + + return result; + } + + @Override + public void excuteDeleteFilesManualy() { + this.cleanCommitLogService.excuteDeleteFilesManualy(); + } + + @Override + public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { + QueryMessageResult queryMessageResult = new QueryMessageResult(); + + long lastQueryMsgTime = end; + + for (int i = 0; i < 3; i++) { + QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); + if (queryOffsetResult.getPhyOffsets().isEmpty()) { + break; + } + + + Collections.sort(queryOffsetResult.getPhyOffsets()); + + queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); + queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); + + for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { + long offset = queryOffsetResult.getPhyOffsets().get(m); + + try { + + boolean match = true; + MessageExt msg = this.lookMessageByOffset(offset); + if (0 == m) { + lastQueryMsgTime = msg.getStoreTimestamp(); + } + +// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); +// if (topic.equals(msg.getTopic())) { +// for (String k : keyArray) { +// if (k.equals(key)) { +// match = true; +// break; +// } +// } +// } + + if (match) { + SelectMappedBufferResult result = this.commitLog.getData(offset, false); + if (result != null) { + int size = result.getByteBuffer().getInt(0); + result.getByteBuffer().limit(size); + result.setSize(size); + queryMessageResult.addMessage(result); + } + } else { + log.warn("queryMessage hash duplicate, {} {}", topic, key); + } + } catch (Exception e) { + log.error("queryMessage exception", e); + } + } + + + if (queryMessageResult.getBufferTotalSize() > 0) { + break; + } + + + if (lastQueryMsgTime < begin) { + break; + } + } + + return queryMessageResult; + } + + @Override + public void updateHaMasterAddress(String newAddr) { + this.haService.updateMasterAddress(newAddr); + } + + @Override + public long slaveFallBehindMuch() { + return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get(); + } + + @Override + public long now() { + return this.systemClock.now(); + } + + @Override + public int cleanUnusedTopic(Set<String> topics) { + Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); + String topic = next.getKey(); + + if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { + ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue(); + for (ConsumeQueue cq : queueTable.values()) { + cq.destroy(); + log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // + cq.getTopic(), // + cq.getQueueId() // + ); + + this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); + } + it.remove(); + + log.info("cleanUnusedTopic: {},topic destroyed", topic); + } + } + + return 0; + } + + public void cleanExpiredConsumerQueue() { + long minCommitLogOffset = this.commitLog.getMinOffset(); + + Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); + String topic = next.getKey(); + if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { + ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue(); + Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator(); + while (itQT.hasNext()) { + Entry<Integer, ConsumeQueue> nextQT = itQT.next(); + long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); + + + if (maxCLOffsetInConsumeQueue == -1) { + log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", // + nextQT.getValue().getTopic(), // + nextQT.getValue().getQueueId(), // + nextQT.getValue().getMaxPhysicOffset(), // + nextQT.getValue().getMinLogicOffset()); + } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { + log.info( + "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", // + topic, // + nextQT.getKey(), // + minCommitLogOffset, // + maxCLOffsetInConsumeQueue); + + DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), + nextQT.getValue().getQueueId()); + + nextQT.getValue().destroy(); + itQT.remove(); + } + } + + if (queueTable.isEmpty()) { + log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic); + it.remove(); + } + } + } + } + + public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) { + Map<String, Long> messageIds = new HashMap<String, Long>(); + if (this.shutdown) { + return messageIds; + } + + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); + if (consumeQueue != null) { + minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQuque()); + maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQuque()); + + if (maxOffset == 0) { + return messageIds; + } + + long nextOffset = minOffset; + while (nextOffset < maxOffset) { + SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(nextOffset); + if (bufferConsumeQueue != null) { + try { + int i = 0; + for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { + long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); + final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + String msgId = + MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); + messageIds.put(msgId, nextOffset++); + if (nextOffset > maxOffset) { + return messageIds; + } + } + } finally { + + bufferConsumeQueue.release(); + } + } else { + return messageIds; + } + } + } + return messageIds; + } + + @Override + public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) { + + final long maxOffsetPy = this.commitLog.getMaxOffset(); + + ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); + if (consumeQueue != null) { + SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset); + if (bufferConsumeQueue != null) { + try { + for (int i = 0; i < bufferConsumeQueue.getSize(); ) { + i += ConsumeQueue.CQ_STORE_UNIT_SIZE; + long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); + return checkInDiskByCommitOffset(offsetPy, maxOffsetPy); + } + } finally { + + bufferConsumeQueue.release(); + } + } else { + return false; + } + } + return false; + } + + public long dispatchBehindBytes() { + return this.reputMessageService.behind(); + } + + @Override + public long flush() { + return this.commitLog.flush(); + } + + @Override + public boolean resetWriteOffset(long phyOffset) { + return this.commitLog.resetOffset(phyOffset); + } + + @Override + public long getConfirmOffset() { + return this.commitLog.getConfirmOffset(); + } + + @Override + public void setConfirmOffset(long phyOffset) { + this.commitLog.setConfirmOffset(phyOffset); + } + + public MessageExt lookMessageByOffset(long commitLogOffset, int size) { + SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size); + if (null != sbr) { + try { + return MessageDecoder.decode(sbr.getByteBuffer(), true, false); + } finally { + sbr.release(); + } + } + + return null; + } + + public ConsumeQueue findConsumeQueue(String topic, int queueId) { + ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); + if (null == map) { + ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); + ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); + if (oldMap != null) { + map = oldMap; + } else { + map = newMap; + } + } + + ConsumeQueue logic = map.get(queueId); + if (null == logic) { + ConsumeQueue newLogic = new ConsumeQueue(// + topic, // + queueId, // + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), // + this); + ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); + if (oldLogic != null) { + logic = oldLogic; + } else { + logic = newLogic; + } + } + + return logic; + } + + private long nextOffsetCorrection(long oldOffset, long newOffset) { + long nextOffset = oldOffset; + if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) { + nextOffset = newOffset; + } + return nextOffset; + } + + private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { + long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); + return (maxOffsetPy - offsetPy) > memory; + } + + private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) { + + if (0 == bufferTotal || 0 == messageTotal) { + return false; + } + + if ((messageTotal + 1) >= maxMsgNums) { + return true; + } + + + if (isInDisk) { + if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) { + return true; + } + + if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) { + return true; + } + } else { + if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { + return true; + } + + if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) { + return true; + } + } + + return false; + } + + private void deleteFile(final String fileName) { + File file = new File(fileName); + boolean result = file.delete(); + log.info(fileName + (result ? " delete OK" : " delete Failed")); + } + + /** + * @throws IOException + */ + private void createTempFile() throws IOException { + String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); + File file = new File(fileName); + MappedFile.ensureDirOK(file.getParent()); + boolean result = file.createNewFile(); + log.info(fileName + (result ? " create OK" : " already exists")); + } + + private void addScheduleTask() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + DefaultMessageStore.this.cleanFilesPeriodically(); + } + }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + DefaultMessageStore.this.checkSelf(); + } + }, 1, 10, TimeUnit.MINUTES); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { + try { + if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { + long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); + if (lockTime > 1000 && lockTime < 10000000) { + + String stack = UtilAll.jstack(); + final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" + + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; + MixAll.string2FileNotSafe(stack, fileName); + } + } + } catch (Exception e) { + } + } + } + }, 1, 1, TimeUnit.SECONDS); + + // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + // @Override + // public void run() { + // DefaultMessageStore.this.cleanExpiredConsumerQueue(); + // } + // }, 1, 1, TimeUnit.HOURS); + } + + private void cleanFilesPeriodically() { + this.cleanCommitLogService.run(); + this.cleanConsumeQueueService.run(); + } + + private void checkSelf() { + this.commitLog.checkSelf(); + + Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); + Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator(); + while (itNext.hasNext()) { + Entry<Integer, ConsumeQueue> cq = itNext.next(); + cq.getValue().checkSelf(); + } + } + } + + private boolean isTempFileExist() { + String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); + File file = new File(fileName); + return file.exists(); + } + + private boolean loadConsumeQueue() { + File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); + File[] fileTopicList = dirLogic.listFiles(); + if (fileTopicList != null) { + + for (File fileTopic : fileTopicList) { + String topic = fileTopic.getName(); + + File[] fileQueueIdList = fileTopic.listFiles(); + if (fileQueueIdList != null) { + for (File fileQueueId : fileQueueIdList) { + int queueId; + try { + queueId = Integer.parseInt(fileQueueId.getName()); + } catch (NumberFormatException e) { + continue; + } + ConsumeQueue logic = new ConsumeQueue( + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), + this); + this.putConsumeQueue(topic, queueId, logic); + if (!logic.load()) { + return false; + } + } + } + } + } + + log.info("load logics queue all over, OK"); + + return true; + } + + private void recover(final boolean lastExitOK) { + this.recoverConsumeQueue(); + + + if (lastExitOK) { + this.commitLog.recoverNormally(); + } else { + this.commitLog.recoverAbnormally(); + } + + this.recoverTopicQueueTable(); + } + + public MessageStoreConfig getMessageStoreConfig() { + return messageStoreConfig; + } + + public TransientStorePool getTransientStorePool() { + return transientStorePool; + } + + private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { + ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); + if (null == map) { + map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>(); + map.put(queueId, consumeQueue); + this.consumeQueueTable.put(topic, map); + } else { + map.put(queueId, consumeQueue); + } + } + + private void recoverConsumeQueue() { + for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { + for (ConsumeQueue logic : maps.values()) { + logic.recover(); + } + } + } + + private void recoverTopicQueueTable() { + HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); + long minPhyOffset = this.commitLog.getMinOffset(); + for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { + for (ConsumeQueue logic : maps.values()) { + String key = logic.getTopic() + "-" + logic.getQueueId(); + table.put(key, logic.getMaxOffsetInQuque()); + logic.correctMinOffset(minPhyOffset); + } + } + + this.commitLog.setTopicQueueTable(table); + } + + public AllocateMappedFileService getAllocateMappedFileService() { + return allocateMappedFileService; + } + + public StoreStatsService getStoreStatsService() { + return storeStatsService; + } + + public RunningFlags getAccessRights() { + return runningFlags; + } + + public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() { + return consumeQueueTable; + } + + public StoreCheckpoint getStoreCheckpoint() { + return storeCheckpoint; + } + + public HAService getHaService() { + return haService; + } + + public ScheduleMessageService getScheduleMessageService() { + return scheduleMessageService; + } + + public RunningFlags getRunningFlags() { + return runningFlags; + } + + public void doDispatch(DispatchRequest req) { + final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag()); + switch (tranType) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(), + req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset()); + break; + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + } + + if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) { + DefaultMessageStore.this.indexService.buildIndex(req); + } + } + + public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, + long logicOffset) { + ConsumeQueue cq = this.findConsumeQueue(topic, queueId); + cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset); + } + + public BrokerStatsManager getBrokerStatsManager() { + return brokerStatsManager; + } + + class CleanCommitLogService { + + private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; + private final double diskSpaceWarningLevelRatio = + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); + + private final double diskSpaceCleanForciblyRatio = + Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); + private long lastRedeleteTimestamp = 0; + + private volatile int manualDeleteFileSeveralTimes = 0; + + private volatile boolean cleanImmediately = false; + + + public void excuteDeleteFilesManualy() { + this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; + DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked"); + } + + + public void run() { + try { + this.deleteExpiredFiles(); + + this.redeleteHangedFile(); + } catch (Exception e) { + DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + private void deleteExpiredFiles() { + int deleteCount = 0; + long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); + int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); + int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); + + boolean timeup = this.isTimeToDelete(); + boolean spacefull = this.isSpaceToDelete(); + boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; + + + if (timeup || spacefull || manualDelete) { + + if (manualDelete) + this.manualDeleteFileSeveralTimes--; + + + boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; + + log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", // + fileReservedTime, // + timeup, // + spacefull, // + manualDeleteFileSeveralTimes, // + cleanAtOnce); + + + fileReservedTime *= 60 * 60 * 1000; + + deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, + destroyMapedFileIntervalForcibly, cleanAtOnce); + if (deleteCount > 0) { + } else if (spacefull) { + log.warn("disk space will be full soon, but delete file failed."); + } + } + } + + private void redeleteHangedFile() { + int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); + long currentTimestamp = System.currentTimeMillis(); + if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { + this.lastRedeleteTimestamp = currentTimestamp; + int destroyMapedFileIntervalForcibly = + DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); + if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { + } + } + } + + public String getServiceName() { + return CleanCommitLogService.class.getSimpleName(); + } + + private boolean isTimeToDelete() { + String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); + if (UtilAll.isItTimeToDo(when)) { + DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); + return true; + } + + return false; + } + + private boolean isSpaceToDelete() { + double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; + + cleanImmediately = false; + + + { + String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + if (physicRatio > diskSpaceWarningLevelRatio) { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); + if (diskok) { + DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); + } + + cleanImmediately = true; + } else if (physicRatio > diskSpaceCleanForciblyRatio) { + cleanImmediately = true; + } else { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); + if (!diskok) { + DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); + } + } + + if (physicRatio < 0 || physicRatio > ratio) { + DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); + return true; + } + } + + + { + String storePathLogics = StorePathConfigHelper + .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); + double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); + if (logicsRatio > diskSpaceWarningLevelRatio) { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); + if (diskok) { + DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); + } + + cleanImmediately = true; + } else if (logicsRatio > diskSpaceCleanForciblyRatio) { + cleanImmediately = true; + } else { + boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); + if (!diskok) { + DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); + } + } + + if (logicsRatio < 0 || logicsRatio > ratio) { + DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio); + return true; + } + } + + return false; + } + + public int getManualDeleteFileSeveralTimes() { + return manualDeleteFileSeveralTimes; + } + + public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { + this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; + } + } + + class CleanConsumeQueueService { + private long lastPhysicalMinOffset = 0; + + public void run() { + try { + this.deleteExpiredFiles(); + } catch (Exception e) { + DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + private void deleteExpiredFiles() { + int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); + + long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); + if (minOffset > this.lastPhysicalMinOffset) { + this.lastPhysicalMinOffset = minOffset; + + + ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; + + for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { + for (ConsumeQueue logic : maps.values()) { + int deleteCount = logic.deleteExpiredFile(minOffset); + + if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { + try { + Thread.sleep(deleteLogicsFilesInterval); + } catch (InterruptedException e) { + } + } + } + } + + + DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); + } + } + + public String getServiceName() { + return CleanConsumeQueueService.class.getSimpleName(); + } + } + + class FlushConsumeQueueService extends ServiceThread { + private static final int RETRY_TIMES_OVER = 3; + private long lastFlushTimestamp = 0; + + + private void doFlush(int retryTimes) { + int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); + + if (retryTimes == RETRY_TIMES_OVER) { + flushConsumeQueueLeastPages = 0; + } + + long logicsMsgTimestamp = 0; + + + int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); + long currentTimeMillis = System.currentTimeMillis(); + if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { + this.lastFlushTimestamp = currentTimeMillis; + flushConsumeQueueLeastPages = 0; + logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); + } + + ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; + + for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { + for (ConsumeQueue cq : maps.values()) { + boolean result = false; + for (int i = 0; i < retryTimes && !result; i++) { + result = cq.flush(flushConsumeQueueLeastPages); + } + } + } + + if (0 == flushConsumeQueueLeastPages) { + if (logicsMsgTimestamp > 0) { + DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); + } + DefaultMessageStore.this.getStoreCheckpoint().flush(); + } + } + + + public void run() { + DefaultMessageStore.log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); + this.waitForRunning(interval); + this.doFlush(1); + } catch (Exception e) { + DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + + this.doFlush(RETRY_TIMES_OVER); + + DefaultMessageStore.log.info(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return FlushConsumeQueueService.class.getSimpleName(); + } + + + @Override + public long getJointime() { + return 1000 * 60; + } + } + + class ReputMessageService extends ServiceThread { + + private volatile long reputFromOffset = 0; + + public long getReputFromOffset() { + return reputFromOffset; + } + + @Override + public void shutdown() { + for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + + if (this.isCommitLogAvailable()) { + log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", + DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); + } + + super.shutdown(); + } + + public void setReputFromOffset(long reputFromOffset) { + this.reputFromOffset = reputFromOffset; + } + + public long behind() { + return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; + } + + + private boolean isCommitLogAvailable() { + return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); + } + + + private void doReput() { + for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { + + if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // + && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { + break; + } + + SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); + if (result != null) { + try { + this.reputFromOffset = result.getStartOffset(); + + for (int readSize = 0; readSize < result.getSize() && doNext; ) { + DispatchRequest dispatchRequest = + DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); + int size = dispatchRequest.getMsgSize(); + + if (dispatchRequest.isSuccess()) { + if (size > 0) { + DefaultMessageStore.this.doDispatch(dispatchRequest); + + if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() + && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { + DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), + dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, + dispatchRequest.getTagsCode()); + } + // FIXED BUG By shijia + this.reputFromOffset += size; + readSize += size; + if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { + DefaultMessageStore.this.storeStatsService + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); + DefaultMessageStore.this.storeStatsService + .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) + .addAndGet(dispatchRequest.getMsgSize()); + } + } else if (size == 0) { + this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); + readSize = result.getSize(); + } + } else if (!dispatchRequest.isSuccess()) { + + + if (size > 0) { + log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); + this.reputFromOffset += size; + } else { + doNext = false; + if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { + log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", + this.reputFromOffset); + + this.reputFromOffset += result.getSize() - readSize; + } + } + } + } + } finally { + result.release(); + } + } else { + doNext = false; + } + } + } + + + @Override + public void run() { + DefaultMessageStore.log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + Thread.sleep(1); + this.doReput(); + } catch (Exception e) { + DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + DefaultMessageStore.log.info(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return ReputMessageService.class.getSimpleName(); + } + + + } + + public int remainTransientStoreBufferNumbs() { + return this.transientStorePool.remainBufferNumbs(); + } + + @Override + public boolean isTransientStorePoolDeficient() { + return remainTransientStoreBufferNumbs() == 0; + } + + + public void unlockMappedFile(final MappedFile mappedFile) { + this.scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + mappedFile.munlock(); + } + }, 6, TimeUnit.SECONDS); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java new file mode 100644 index 0000000..4f2c1a9 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * @author shijia.wxr + */ +public class DispatchRequest { + private final String topic; + private final int queueId; + private final long commitLogOffset; + private final int msgSize; + private final long tagsCode; + private final long storeTimestamp; + private final long consumeQueueOffset; + private final String keys; + private final boolean success; + private final String uniqKey; + + private final int sysFlag; + private final long preparedTransactionOffset; + + + public DispatchRequest( + final String topic, + final int queueId, + final long commitLogOffset, + final int msgSize, + final long tagsCode, + final long storeTimestamp, + final long consumeQueueOffset, + final String keys, + final String uniqKey, + final int sysFlag, + final long preparedTransactionOffset + ) { + this.topic = topic; + this.queueId = queueId; + this.commitLogOffset = commitLogOffset; + this.msgSize = msgSize; + this.tagsCode = tagsCode; + this.storeTimestamp = storeTimestamp; + this.consumeQueueOffset = consumeQueueOffset; + this.keys = keys; + this.uniqKey = uniqKey; + + this.sysFlag = sysFlag; + this.preparedTransactionOffset = preparedTransactionOffset; + this.success = true; + } + + public DispatchRequest(int size) { + // 1 + this.topic = ""; + // 2 + this.queueId = 0; + // 3 + this.commitLogOffset = 0; + // 4 + this.msgSize = size; + // 5 + this.tagsCode = 0; + // 6 + this.storeTimestamp = 0; + // 7 + this.consumeQueueOffset = 0; + // 8 + this.keys = ""; + //9 + this.uniqKey = null; + this.sysFlag = 0; + this.preparedTransactionOffset = 0; + this.success = false; + } + + public DispatchRequest(int size, boolean success) { + // 1 + this.topic = ""; + // 2 + this.queueId = 0; + // 3 + this.commitLogOffset = 0; + // 4 + this.msgSize = size; + // 5 + this.tagsCode = 0; + // 6 + this.storeTimestamp = 0; + // 7 + this.consumeQueueOffset = 0; + // 8 + this.keys = ""; + // 9 + this.uniqKey = null; + this.sysFlag = 0; + this.preparedTransactionOffset = 0; + this.success = success; + } + + + public String getTopic() { + return topic; + } + + + public int getQueueId() { + return queueId; + } + + + public long getCommitLogOffset() { + return commitLogOffset; + } + + + public int getMsgSize() { + return msgSize; + } + + + public long getStoreTimestamp() { + return storeTimestamp; + } + + + public long getConsumeQueueOffset() { + return consumeQueueOffset; + } + + + public String getKeys() { + return keys; + } + + + public long getTagsCode() { + return tagsCode; + } + + + public int getSysFlag() { + return sysFlag; + } + + + public long getPreparedTransactionOffset() { + return preparedTransactionOffset; + } + + + public boolean isSuccess() { + return success; + } + + public String getUniqKey() { + return uniqKey; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java new file mode 100644 index 0000000..05a0003 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class GetMessageResult { + + private final List<SelectMappedBufferResult> messageMapedList = + new ArrayList<SelectMappedBufferResult>(100); + + private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100); + + private GetMessageStatus status; + private long nextBeginOffset; + private long minOffset; + private long maxOffset; + + private int bufferTotalSize = 0; + + private boolean suggestPullingFromSlave = false; + + private int msgCount4Commercial = 0; + + + public GetMessageResult() { + } + + + public GetMessageStatus getStatus() { + return status; + } + + + public void setStatus(GetMessageStatus status) { + this.status = status; + } + + + public long getNextBeginOffset() { + return nextBeginOffset; + } + + + public void setNextBeginOffset(long nextBeginOffset) { + this.nextBeginOffset = nextBeginOffset; + } + + + public long getMinOffset() { + return minOffset; + } + + + public void setMinOffset(long minOffset) { + this.minOffset = minOffset; + } + + + public long getMaxOffset() { + return maxOffset; + } + + + public void setMaxOffset(long maxOffset) { + this.maxOffset = maxOffset; + } + + + public List<SelectMappedBufferResult> getMessageMapedList() { + return messageMapedList; + } + + + public List<ByteBuffer> getMessageBufferList() { + return messageBufferList; + } + + + public void addMessage(final SelectMappedBufferResult mapedBuffer) { + this.messageMapedList.add(mapedBuffer); + this.messageBufferList.add(mapedBuffer.getByteBuffer()); + this.bufferTotalSize += mapedBuffer.getSize(); + this.msgCount4Commercial += (int) Math.ceil( + mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); + } + + + public void release() { + for (SelectMappedBufferResult select : this.messageMapedList) { + select.release(); + } + } + + + public int getBufferTotalSize() { + return bufferTotalSize; + } + + + public void setBufferTotalSize(int bufferTotalSize) { + this.bufferTotalSize = bufferTotalSize; + } + + + public int getMessageCount() { + return this.messageMapedList.size(); + } + + + public boolean isSuggestPullingFromSlave() { + return suggestPullingFromSlave; + } + + + public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) { + this.suggestPullingFromSlave = suggestPullingFromSlave; + } + + public int getMsgCount4Commercial() { + return msgCount4Commercial; + } + + public void setMsgCount4Commercial(int msgCount4Commercial) { + this.msgCount4Commercial = msgCount4Commercial; + } + + + @Override + public String toString() { + return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset=" + + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize + + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java new file mode 100644 index 0000000..87d6fe0 --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store; + +/** + * @author shijia.wxr + */ +public enum GetMessageStatus { + + FOUND, + + NO_MATCHED_MESSAGE, + + MESSAGE_WAS_REMOVING, + + OFFSET_FOUND_NULL, + + OFFSET_OVERFLOW_BADLY, + + OFFSET_OVERFLOW_ONE, + + OFFSET_TOO_SMALL, + + NO_MATCHED_LOGIC_QUEUE, + + NO_MESSAGE_IN_QUEUE, +}
