This is an automated email from the ASF dual-hosted git repository.
shunzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new 71355c37 make effective router immutable to prevent it from being
changed accidentally (#156)
71355c37 is described below
commit 71355c37cead0863f7d88d399f1e5289ce8f4d62
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Mon Jul 21 17:54:34 2025 +0800
make effective router immutable to prevent it from being changed
accidentally (#156)
---
.../org/apache/bifromq/basekv/client/BaseKVStoreClient.java | 6 +++---
.../apache/bifromq/inbox/store/InboxStoreGCProcessor.java | 13 +++++++------
2 files changed, 10 insertions(+), 9 deletions(-)
diff --git
a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java
b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java
index 2333a3e1..a4201620 100644
---
a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java
+++
b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/BaseKVStoreClient.java
@@ -126,7 +126,7 @@ final class BaseKVStoreClient implements IBaseKVStoreClient
{
// key: storeId
private final Map<String, List<IQueryPipeline>> lnrQueryPplns =
Maps.newHashMap();
private final AtomicReference<NavigableMap<Boundary, KVRangeSetting>>
effectiveRouter =
- new AtomicReference<>(new TreeMap<>(BoundaryUtil::compare));
+ new AtomicReference<>(Collections.unmodifiableNavigableMap(new
TreeMap<>(BoundaryUtil::compare)));
// key: serverId, val: storeId
private volatile Map<String, String> serverToStoreMap = Maps.newHashMap();
@@ -486,7 +486,7 @@ final class BaseKVStoreClient implements IBaseKVStoreClient
{
}
NavigableMap<Boundary, KVRangeSetting> last = effectiveRouter.get();
if (!router.equals(last)) {
- effectiveRouter.set(router);
+ effectiveRouter.set(Collections.unmodifiableNavigableMap(router));
return true;
}
return false;
@@ -519,7 +519,7 @@ final class BaseKVStoreClient implements IBaseKVStoreClient
{
}
}
patched.put(setting.boundary, setting);
- effectiveRouter.compareAndSet(router, patched);
+ effectiveRouter.compareAndSet(router,
Collections.unmodifiableNavigableMap(patched));
}
}
diff --git
a/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java
b/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java
index 39b67628..fb206d97 100644
---
a/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java
+++
b/bifromq-inbox/bifromq-inbox-store/src/main/java/org/apache/bifromq/inbox/store/InboxStoreGCProcessor.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.inbox.store;
@@ -22,6 +22,10 @@ package org.apache.bifromq.inbox.store;
import static
org.apache.bifromq.basekv.client.KVRangeRouterUtil.findByBoundary;
import static org.apache.bifromq.basekv.utils.BoundaryUtil.FULL_BOUNDARY;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basekv.client.IBaseKVStoreClient;
import org.apache.bifromq.basekv.client.KVRangeSetting;
import org.apache.bifromq.basekv.client.exception.BadRequestException;
@@ -33,9 +37,6 @@ import org.apache.bifromq.basekv.store.proto.ROCoProcInput;
import org.apache.bifromq.inbox.storage.proto.GCReply;
import org.apache.bifromq.inbox.storage.proto.GCRequest;
import org.apache.bifromq.inbox.storage.proto.InboxServiceROCoProcInput;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import lombok.extern.slf4j.Slf4j;
@Slf4j
public class InboxStoreGCProcessor implements IInboxStoreGCProcessor {
@@ -49,8 +50,8 @@ public class InboxStoreGCProcessor implements
IInboxStoreGCProcessor {
@Override
public final CompletableFuture<Result> gc(long reqId, long now) {
- Collection<KVRangeSetting> rangeSettingList =
findByBoundary(FULL_BOUNDARY,
- storeClient.latestEffectiveRouter());
+ Collection<KVRangeSetting> rangeSettingList =
Sets.newHashSet(findByBoundary(FULL_BOUNDARY,
+ storeClient.latestEffectiveRouter()));
if (localServerId != null) {
rangeSettingList.removeIf(rangeSetting ->
!rangeSetting.leader.equals(localServerId));
}