This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e11e29419f [ISSUE #7308] Adding topic blacklist and filter in tiered
storage module (#7310)
e11e29419f is described below
commit e11e29419f6e2d1d9673d0329e57b824ebf3da47
Author: lizhimins <[email protected]>
AuthorDate: Wed Sep 6 20:42:24 2023 +0800
[ISSUE #7308] Adding topic blacklist and filter in tiered storage module
(#7310)
---
.../rocketmq/tieredstore/TieredDispatcher.java | 21 ++++++++--
.../rocketmq/tieredstore/TieredMessageStore.java | 1 +
.../tieredstore/file/TieredFlatFileManager.java | 17 +++++---
.../provider/TieredStoreTopicBlackListFilter.java | 45 ++++++++++++++++++++++
.../provider/TieredStoreTopicFilter.java | 25 ++++++++++++
.../TieredStoreTopicBlackListFilterTest.java | 36 +++++++++++++++++
6 files changed, 136 insertions(+), 9 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 430c2b62eb..766c559e9c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -48,6 +48,8 @@ import
org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import
org.apache.rocketmq.tieredstore.provider.TieredStoreTopicBlackListFilter;
+import org.apache.rocketmq.tieredstore.provider.TieredStoreTopicFilter;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -56,6 +58,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
private static final Logger logger =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ private TieredStoreTopicFilter topicFilter;
private final String brokerName;
private final MessageStore defaultStore;
private final TieredMessageStoreConfig storeConfig;
@@ -70,15 +73,15 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
this.defaultStore = defaultStore;
this.storeConfig = storeConfig;
this.brokerName = storeConfig.getBrokerName();
+ this.topicFilter = new TieredStoreTopicBlackListFilter();
this.tieredFlatFileManager =
TieredFlatFileManager.getInstance(storeConfig);
this.dispatchRequestReadMap = new ConcurrentHashMap<>();
this.dispatchRequestWriteMap = new ConcurrentHashMap<>();
this.dispatchTaskLock = new ReentrantLock();
this.dispatchWriteLock = new ReentrantLock();
- this.initScheduleTask();
}
- private void initScheduleTask() {
+ protected void initScheduleTask() {
TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(()
->
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile ->
{
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
@@ -87,6 +90,14 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}), 30, 10, TimeUnit.SECONDS);
}
+ public TieredStoreTopicFilter getTopicFilter() {
+ return topicFilter;
+ }
+
+ public void setTopicFilter(TieredStoreTopicFilter topicFilter) {
+ this.topicFilter = topicFilter;
+ }
+
@Override
public void dispatch(DispatchRequest request) {
if (stopped) {
@@ -94,7 +105,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
String topic = request.getTopic();
- if (TieredStoreUtil.isSystemTopic(topic)) {
+ if (topicFilter != null && topicFilter.filterTopic(topic)) {
return;
}
@@ -219,6 +230,10 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
return;
}
+ if (topicFilter != null &&
topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
+ return;
+ }
+
if (flatFile.getDispatchOffset() == -1L) {
return;
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 78e855f365..9fb1b2f01c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -90,6 +90,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
boolean loadNextStore = next.load();
boolean result = loadFlatFile && loadNextStore;
if (result) {
+ dispatcher.initScheduleTask();
dispatcher.start();
}
return result;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index e9ae4a5a52..7c744af3b9 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -134,21 +134,21 @@ public class TieredFlatFileManager {
public void doCleanExpiredFile() {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
- Random random = new Random();
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
- int delay = random.nextInt(storeConfig.getMaxCommitJitter());
- TieredStoreExecutor.cleanExpiredFileExecutor.schedule(() -> {
+ TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
flatFile.getCompositeFlatFileLock().lock();
try {
flatFile.cleanExpiredFile(expiredTimeStamp);
flatFile.destroyExpiredFile();
if (flatFile.getConsumeQueueBaseOffset() == -1) {
+ logger.info("Clean flatFile because file not
initialized, topic={}, queueId={}",
+ flatFile.getMessageQueue().getTopic(),
flatFile.getMessageQueue().getQueueId());
destroyCompositeFile(flatFile.getMessageQueue());
}
} finally {
flatFile.getCompositeFlatFileLock().unlock();
}
- }, delay, TimeUnit.MILLISECONDS);
+ });
}
if (indexFile != null) {
indexFile.cleanExpiredFile(expiredTimeStamp);
@@ -218,8 +218,13 @@ public class TieredFlatFileManager {
storeConfig.getBrokerName(),
queueMetadata.getQueue().getQueueId()));
queueCount.incrementAndGet();
});
- logger.info("Recover TopicFlatFile, topic: {},
queueCount: {}, cost: {}ms",
- topicMetadata.getTopic(), queueCount.get(),
subWatch.elapsed(TimeUnit.MILLISECONDS));
+
+ if (queueCount.get() == 0L) {
+
metadataStore.deleteTopic(topicMetadata.getTopic());
+ } else {
+ logger.info("Recover TopicFlatFile, topic: {},
queueCount: {}, cost: {}ms",
+ topicMetadata.getTopic(), queueCount.get(),
subWatch.elapsed(TimeUnit.MILLISECONDS));
+ }
} catch (Exception e) {
logger.error("Recover TopicFlatFile error, topic: {}",
topicMetadata.getTopic(), e);
} finally {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
new file mode 100644
index 0000000000..50adbb7136
--- /dev/null
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter
{
+
+ private final Set<String> topicBlackSet;
+
+ public TieredStoreTopicBlackListFilter() {
+ this.topicBlackSet = new HashSet<>();
+ }
+
+ @Override
+ public boolean filterTopic(String topicName) {
+ if (StringUtils.isBlank(topicName)) {
+ return true;
+ }
+ return TieredStoreUtil.isSystemTopic(topicName) ||
topicBlackSet.contains(topicName);
+ }
+
+ @Override
+ public void addTopicToWhiteList(String topicName) {
+ this.topicBlackSet.add(topicName);
+ }
+}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
new file mode 100644
index 0000000000..3f26b8b026
--- /dev/null
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+public interface TieredStoreTopicFilter {
+
+ boolean filterTopic(String topicName);
+
+ void addTopicToWhiteList(String topicName);
+}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
new file mode 100644
index 0000000000..2bf48173c4
--- /dev/null
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.provider;
+
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredStoreTopicBlackListFilterTest {
+
+ @Test
+ public void filterTopicTest() {
+ TieredStoreTopicFilter topicFilter = new
TieredStoreTopicBlackListFilter();
+ Assert.assertTrue(topicFilter.filterTopic(""));
+
Assert.assertTrue(topicFilter.filterTopic(TopicValidator.SYSTEM_TOPIC_PREFIX +
"_Topic"));
+
+ String topicName = "WhiteTopic";
+ Assert.assertFalse(topicFilter.filterTopic(topicName));
+ topicFilter.addTopicToWhiteList(topicName);
+ Assert.assertTrue(topicFilter.filterTopic(topicName));
+ }
+}
\ No newline at end of file