This is an automated email from the ASF dual-hosted git repository.

gujiaweijoe pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new b378015b Various small improvements and code cleanup (#159)
b378015b is described below

commit b378015bf3d5df047e1d392081564f7ba41c7f02
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Wed Jul 30 18:44:26 2025 +0800

    Various small improvements and code cleanup (#159)
    
    1. improve HLC impl and documentation
    2. fix the edge case so that '2,2,0' could be balanced to '2,1,1'
    3. improve the session memory estimation
    4. code cleanup
    5. add contextual tags to compactor's metrics
    6. enable pin data which could reduce buffer copy in some cases
---
 .../main/java/org/apache/bifromq/basehlc/HLC.java  | 122 +++++++++++++--------
 .../rocksdb/RocksDBKVEngineIterator.java           |  37 +++----
 .../basekv/localengine/rocksdb/RocksDBKVSpace.java |  30 ++---
 .../basekv/balance/impl/ReplicaCntBalancer.java    |  13 ++-
 .../main/proto/inboxservice/InboxStoreCoProc.proto |  11 --
 .../mqtt/handler/MQTTPersistentSessionHandler.java |  10 +-
 .../bifromq/mqtt/handler/MQTTSessionHandler.java   |  12 --
 .../mqtt/handler/MQTTTransientSessionHandler.java  |  11 +-
 .../handler/v3/MQTT3PersistentSessionHandler.java  |  24 +++-
 .../handler/v3/MQTT3TransientSessionHandler.java   |  24 +++-
 .../handler/v5/MQTT5PersistentSessionHandler.java  |  24 +++-
 .../handler/v5/MQTT5TransientSessionHandler.java   |  24 +++-
 12 files changed, 218 insertions(+), 124 deletions(-)

diff --git a/base-hlc/src/main/java/org/apache/bifromq/basehlc/HLC.java 
b/base-hlc/src/main/java/org/apache/bifromq/basehlc/HLC.java
index 9c4e3568..82d29a1f 100644
--- a/base-hlc/src/main/java/org/apache/bifromq/basehlc/HLC.java
+++ b/base-hlc/src/main/java/org/apache/bifromq/basehlc/HLC.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.basehlc;
@@ -22,6 +22,11 @@ package org.apache.bifromq.basehlc;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 
+/**
+ * The HLC (Hybrid Logical Clock) is a logical clock that combines physical 
time with a causal counter.
+ * It is designed to provide a consistent timestamp that can be used in 
distributed systems to order events
+ * while also allowing for the physical time to be reflected in the timestamp.
+ */
 public class HLC {
     public static final HLC INST = new HLC();
     private static final long CAUSAL_MASK = 0x00_00_00_00_00_00_FF_FFL;
@@ -37,13 +42,14 @@ public class HLC {
     }
 
     /**
-     * Assume cache line size is 64bytes, and first 16 bytes are reserved for 
object header
+     * Assume cache line size is 64bytes, and first 16 bytes are reserved for 
object header.
      */
     private long p3;
     private long p4;
     private long p5;
     private long p6;
     private long p7;
+    // noinspection FieldMayBeFinal
     private volatile long hlc = 0L;
 
     private long p9;
@@ -57,40 +63,79 @@ public class HLC {
     private HLC() {
     }
 
+    private static long nextMillis(long nowMillis) {
+        long t;
+        while ((t = System.currentTimeMillis()) == nowMillis) {
+            Thread.onSpinWait();
+        }
+        return t;
+    }
+
+    /**
+     * Get the current HLC timestamp.
+     *
+     * @return the current HLC timestamp
+     */
     public long get() {
-        long now = hlc;
-        long l = logical(now);
-        long c = causal(now);
-        long updateL = Math.max(l, System.currentTimeMillis());
-        if (updateL == l) {
-            c++;
-        } else {
-            c = 0;
+        for (; ; ) {
+            long now = hlc;
+            long l = logical(now);
+            long c = causal(now);
+
+            long phys = System.currentTimeMillis();
+            long updateL = Math.max(l, phys);
+            c = (updateL == l) ? c + 1 : 0;
+            if (c > CAUSAL_MASK) {
+                updateL = nextMillis(updateL);
+                c = 0;
+            }
+            long newHLC = toTimestamp(updateL, c);
+            long witness = (long) CAE.compareAndExchange(this, now, newHLC);
+            if (witness == now) {
+                return newHLC;
+            }
+            if (witness >= newHLC) {
+                return witness;
+            }
         }
-        long newHLC = toTimestamp(updateL, c);
-        set(now, newHLC);
-        return newHLC;
     }
 
+    /**
+     * Update the HLC timestamp with another HLC timestamp.
+     *
+     * @param otherHLC the other HLC timestamp observed
+     * @return the updated HLC timestamp
+     */
     public long update(long otherHLC) {
-        long now = hlc;
-        long l = logical(now);
-        long c = causal(now);
-        long otherL = logical(otherHLC);
-        long otherC = causal(otherHLC);
-        long updateL = Math.max(l, Math.max(otherL, 
System.currentTimeMillis()));
-        if (updateL == l && otherL == l) {
-            c = Math.max(c, otherC) + 1;
-        } else if (updateL == l) {
-            c++;
-        } else if (otherL == l) {
-            c = otherC + 1;
-        } else {
-            c = 0;
-        }
-        long newHLC = toTimestamp(updateL, c);
-        set(now, newHLC);
-        return newHLC;
+        do {
+            long now = hlc;
+            long l = logical(now);
+            long c = causal(now);
+            long otherL = logical(otherHLC);
+            long otherC = causal(otherHLC);
+            long updateL = Math.max(l, Math.max(otherL, 
System.currentTimeMillis()));
+            if (updateL == l && otherL == l) {
+                c = Math.max(c, otherC) + 1;
+            } else if (updateL == l) {
+                c++;
+            } else if (otherL == l) {
+                c = otherC + 1;
+            } else {
+                c = 0;
+            }
+            if (c > CAUSAL_MASK) {
+                updateL = nextMillis(updateL);
+                c = 0;
+            }
+            long newHLC = toTimestamp(updateL, c);
+            long witness = (long) CAE.compareAndExchange(this, now, newHLC);
+            if (witness == now) {
+                return newHLC;
+            }
+            if (witness >= newHLC) {
+                return witness;
+            }
+        } while (true);
     }
 
     public long getPhysical() {
@@ -102,7 +147,7 @@ public class HLC {
     }
 
     private long toTimestamp(long l, long c) {
-        return (l << 16) + c;
+        return (l << 16) | c;
     }
 
     private long logical(long hlc) {
@@ -112,17 +157,4 @@ public class HLC {
     private long causal(long hlc) {
         return hlc & CAUSAL_MASK;
     }
-
-    private void set(long now, long newHLC) {
-        long expected = now;
-        long witness;
-        do {
-            witness = (long) CAE.compareAndExchange(this, expected, newHLC);
-            if (witness == expected || witness >= newHLC) {
-                break;
-            } else {
-                expected = witness;
-            }
-        } while (true);
-    }
 }
diff --git 
a/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVEngineIterator.java
 
b/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVEngineIterator.java
index 9941773e..0397dd77 100644
--- 
a/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVEngineIterator.java
+++ 
b/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVEngineIterator.java
@@ -19,8 +19,8 @@
 
 package org.apache.bifromq.basekv.localengine.rocksdb;
 
-import org.apache.bifromq.basekv.localengine.KVEngineException;
 import java.lang.ref.Cleaner;
+import org.apache.bifromq.basekv.localengine.KVEngineException;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
@@ -30,23 +30,6 @@ import org.rocksdb.Snapshot;
 
 class RocksDBKVEngineIterator implements AutoCloseable {
     private static final Cleaner CLEANER = Cleaner.create();
-
-    private record NativeState(RocksIterator itr, ReadOptions readOptions, 
Slice lowerSlice, Slice upperSlice)
-        implements Runnable {
-
-        @Override
-        public void run() {
-            itr.close();
-            readOptions.close();
-            if (lowerSlice != null) {
-                lowerSlice.close();
-            }
-            if (upperSlice != null) {
-                upperSlice.close();
-            }
-        }
-    }
-
     private final RocksIterator rocksIterator;
     private final Cleaner.Cleanable onClose;
 
@@ -55,7 +38,7 @@ class RocksDBKVEngineIterator implements AutoCloseable {
                             Snapshot snapshot,
                             byte[] startKey,
                             byte[] endKey) {
-        ReadOptions readOptions = new ReadOptions();
+        ReadOptions readOptions = new ReadOptions().setPinData(true);
         Slice lowerSlice = null;
         if (startKey != null) {
             lowerSlice = new Slice(startKey);
@@ -122,4 +105,20 @@ class RocksDBKVEngineIterator implements AutoCloseable {
     public void close() {
         onClose.clean();
     }
+
+    private record NativeState(RocksIterator itr, ReadOptions readOptions, 
Slice lowerSlice, Slice upperSlice)
+        implements Runnable {
+
+        @Override
+        public void run() {
+            itr.close();
+            readOptions.close();
+            if (lowerSlice != null) {
+                lowerSlice.close();
+            }
+            if (upperSlice != null) {
+                upperSlice.close();
+            }
+        }
+    }
 }
diff --git 
a/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpace.java
 
b/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpace.java
index a5b65605..03bf047a 100644
--- 
a/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpace.java
+++ 
b/base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpace.java
@@ -14,11 +14,15 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.basekv.localengine.rocksdb;
 
+import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
+import static io.reactivex.rxjava3.subjects.BehaviorSubject.createDefault;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
 import static org.apache.bifromq.basekv.localengine.IKVEngine.DEFAULT_NS;
 import static 
org.apache.bifromq.basekv.localengine.metrics.KVSpaceMeters.getCounter;
 import static 
org.apache.bifromq.basekv.localengine.metrics.KVSpaceMeters.getGauge;
@@ -26,20 +30,7 @@ import static 
org.apache.bifromq.basekv.localengine.metrics.KVSpaceMeters.getTim
 import static 
org.apache.bifromq.basekv.localengine.rocksdb.Keys.META_SECTION_END;
 import static 
org.apache.bifromq.basekv.localengine.rocksdb.Keys.META_SECTION_START;
 import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.fromMetaKey;
-import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
-import static io.reactivex.rxjava3.subjects.BehaviorSubject.createDefault;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonList;
 
-import org.apache.bifromq.baseenv.EnvProvider;
-import org.apache.bifromq.basekv.localengine.IKVSpace;
-import org.apache.bifromq.basekv.localengine.IKVSpaceWriter;
-import org.apache.bifromq.basekv.localengine.ISyncContext;
-import org.apache.bifromq.basekv.localengine.KVEngineException;
-import org.apache.bifromq.basekv.localengine.KVSpaceDescriptor;
-import org.apache.bifromq.basekv.localengine.SyncContext;
-import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
-import 
org.apache.bifromq.basekv.localengine.rocksdb.metrics.RocksDBKVSpaceMetric;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import io.micrometer.core.instrument.Counter;
@@ -70,6 +61,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.SneakyThrows;
+import org.apache.bifromq.baseenv.EnvProvider;
+import org.apache.bifromq.basekv.localengine.IKVSpace;
+import org.apache.bifromq.basekv.localengine.IKVSpaceWriter;
+import org.apache.bifromq.basekv.localengine.ISyncContext;
+import org.apache.bifromq.basekv.localengine.KVEngineException;
+import org.apache.bifromq.basekv.localengine.KVSpaceDescriptor;
+import org.apache.bifromq.basekv.localengine.SyncContext;
+import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
+import 
org.apache.bifromq.basekv.localengine.rocksdb.metrics.RocksDBKVSpaceMetric;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -119,7 +119,7 @@ abstract class RocksDBKVSpace<
             configurator.compactMinTombstoneKeys(),
             configurator.compactMinTombstoneRanges(),
             configurator.compactTombstoneKeysRatio(),
-            this::scheduleCompact) : NoopWriteStatsRecorder.INSTANCE;
+            this::scheduleCompact, tags) : NoopWriteStatsRecorder.INSTANCE;
         this.engine = engine;
         compactionExecutor = 
ExecutorServiceMetrics.monitor(Metrics.globalRegistry, new 
ThreadPoolExecutor(1, 1,
                 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
diff --git 
a/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/ReplicaCntBalancer.java
 
b/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/ReplicaCntBalancer.java
index 807f2a8f..dcb47fac 100644
--- 
a/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/ReplicaCntBalancer.java
+++ 
b/base-kv/base-kv-store-balance-controller/src/main/java/org/apache/bifromq/basekv/balance/impl/ReplicaCntBalancer.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.protobuf.Struct;
 import com.google.protobuf.Value;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -277,6 +278,14 @@ public class ReplicaCntBalancer extends 
RuleBasedPlacementBalancer {
         double totalVoters = 
storeVoterCount.values().stream().mapToInt(Integer::intValue).sum();
         double targetVotersPerStore = totalVoters / landscape.size();
         int maxVotersPerStore = (int) Math.ceil(targetVotersPerStore);
+        int minVotersPerStore = (int) Math.floor(targetVotersPerStore);
+
+        int globalMax = Collections.max(storeVoterCount.values());
+        int globalMin = Collections.min(storeVoterCount.values());
+        if (globalMax - globalMin <= 1) {
+            return false;
+        }
+
         for (Map.Entry<Boundary, LeaderRange> entry : 
effectiveRoute.leaderRanges().entrySet()) {
             Boundary boundary = entry.getKey();
             LeaderRange leaderRange = entry.getValue();
@@ -290,11 +299,11 @@ public class ReplicaCntBalancer extends 
RuleBasedPlacementBalancer {
             Set<String> learners = 
Sets.newHashSet(clusterConfig.getLearnersList());
             SortedSet<String> voterSorted = 
Sets.newTreeSet(clusterConfig.getVotersList());
             for (String voter : voterSorted) {
-                if (storeVoterCount.get(voter) > maxVotersPerStore) {
+                if (storeVoterCount.get(voter) >= maxVotersPerStore) {
                     // voter store has overloaded voters
                     for (StoreVoterCount underloadedStore : 
storeVoterCountSorted) {
                         // move to one underloaded store which is current not 
in the voter list
-                        if (storeVoterCount.get(underloadedStore.storeId) < 
maxVotersPerStore
+                        if (storeVoterCount.get(underloadedStore.storeId) <= 
minVotersPerStore
                             && !voterSorted.contains(underloadedStore.storeId)
                             && !learners.contains(underloadedStore.storeId)) {
                             meetingGoal = true;
diff --git 
a/bifromq-inbox/bifromq-inbox-coproc-proto/src/main/proto/inboxservice/InboxStoreCoProc.proto
 
b/bifromq-inbox/bifromq-inbox-coproc-proto/src/main/proto/inboxservice/InboxStoreCoProc.proto
index 713cf8c2..66fa081d 100644
--- 
a/bifromq-inbox/bifromq-inbox-coproc-proto/src/main/proto/inboxservice/InboxStoreCoProc.proto
+++ 
b/bifromq-inbox/bifromq-inbox-coproc-proto/src/main/proto/inboxservice/InboxStoreCoProc.proto
@@ -289,17 +289,6 @@ message GCRequest {
 message GCReply {
 }
 
-message CollectMetricsRequest{
-  uint64 reqId = 1;
-}
-
-message CollectMetricsReply{
-  uint64 reqId = 1;
-  map<string, uint64> subCounts = 2; // sub count by tenant
-  map<string, uint64> subUsedSpaces = 3; // sub used space by tenant
-  map<string, uint64> usedSpaces = 4; // total used space by tenant
-}
-
 message InboxServiceRWCoProcInput{
   uint64 reqId = 1;
   oneof type{
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
index b16519f8..c9552713 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler;
@@ -120,10 +120,6 @@ public abstract class MQTTPersistentSessionHandler extends 
MQTTSessionHandler im
         this.sessionExpirySeconds = sessionExpirySeconds;
     }
 
-    private int estBaseMemSize() {
-        return 72; // base size from JOL
-    }
-
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) {
         super.handlerAdded(ctx);
@@ -139,16 +135,14 @@ public abstract class MQTTPersistentSessionHandler 
extends MQTTSessionHandler im
             }
         }
         setupInboxReader();
-        memUsage.addAndGet(estBaseMemSize());
     }
 
     @Override
-    public final void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) {
         super.channelInactive(ctx);
         if (inboxReader != null) {
             inboxReader.close();
         }
-        memUsage.addAndGet(-estBaseMemSize());
         int remainInboxSize =
             stagingBuffer.values().stream().reduce(0, (acc, msg) -> acc + 
msg.estBytes(), Integer::sum);
         if (remainInboxSize > 0) {
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java
index 15cde24a..0f2844be 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java
@@ -224,16 +224,6 @@ public abstract class MQTTSessionHandler extends 
MQTTMessageHandler implements I
         resourceThrottler = sessionCtx.resourceThrottler;
     }
 
-    private int estMemSize() {
-        int s = 144; // base size from JOL
-        s += userSessionId.length();
-        s += clientInfo.getSerializedSize();
-        if (noDelayLWT != null) {
-            s += noDelayLWT.getSerializedSize();
-        }
-        return s;
-    }
-
     protected abstract IMQTTProtocolHelper helper();
 
     @Override
@@ -382,7 +372,6 @@ public abstract class MQTTSessionHandler extends 
MQTTMessageHandler implements I
         }
         scheduleRedirectCheck();
         onInitialized.whenComplete((v, e) -> 
tenantMeter.recordCount(MqttConnectCount));
-        memUsage.addAndGet(estMemSize());
     }
 
     @Override
@@ -403,7 +392,6 @@ public abstract class MQTTSessionHandler extends 
MQTTMessageHandler implements I
         sessionCtx.localSessionRegistry.remove(channelId(), this);
         sessionRegistration.stop();
         tenantMeter.recordCount(MqttDisconnectCount);
-        memUsage.addAndGet(-estMemSize());
         if (!isGoAway) {
             isGoAway = true;
             
eventCollector.report(getLocal(ByClient.class).withoutDisconnect(true).clientInfo(clientInfo));
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java
index 3d314dfb..af3f45d7 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTTransientSessionHandler.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler;
@@ -116,7 +116,6 @@ public abstract class MQTTTransientSessionHandler extends 
MQTTSessionHandler imp
         subNumGauge = 
sessionCtx.getTransientSubNumGauge(clientInfo.getTenantId());
         onInitialized();
         resumeChannelRead();
-        memUsage.addAndGet(estBaseMemSize());
         // Transient session lifetime is bounded by the channel lifetime
         
eventCollector.report(getLocal(MQTTSessionStart.class).sessionId(userSessionId).clientInfo(clientInfo));
     }
@@ -128,18 +127,12 @@ public abstract class MQTTTransientSessionHandler extends 
MQTTSessionHandler imp
             topicFilters.forEach((topicFilter, option) -> 
addBgTask(unsubTopicFilter(System.nanoTime(), topicFilter)));
         }
         int remainInboxSize = inbox.values().stream().reduce(0, (acc, msg) -> 
acc + msg.estBytes(), Integer::sum);
-        memUsage.addAndGet(-estBaseMemSize());
         memUsage.addAndGet(-remainInboxSize);
         // Transient session lifetime is bounded by the channel lifetime
         
eventCollector.report(getLocal(MQTTSessionStop.class).sessionId(userSessionId).clientInfo(clientInfo));
         ctx.fireChannelInactive();
     }
 
-    private int estBaseMemSize() {
-        // estimate bytes from JOL
-        return 28;
-    }
-
     @Override
     protected ProtocolResponse handleDisconnect(MqttMessage message) {
         Optional<Integer> requestSEI = 
helper().sessionExpiryIntervalOnDisconnect(message);
@@ -186,6 +179,7 @@ public abstract class MQTTTransientSessionHandler extends 
MQTTSessionHandler imp
         if (prevOption == null) {
             subNumGauge.addAndGet(1);
             memUsage.addAndGet(topicFilter.length());
+            memUsage.addAndGet(option.getSerializedSize());
         }
         return addMatchRecord(reqId, topicFilter, option.getIncarnation())
             .thenApplyAsync(matchResult -> {
@@ -250,6 +244,7 @@ public abstract class MQTTTransientSessionHandler extends 
MQTTSessionHandler imp
         } else {
             subNumGauge.addAndGet(-1);
             memUsage.addAndGet(-topicFilter.length());
+            memUsage.addAndGet(-option.getSerializedSize());
         }
         return removeMatchRecord(reqId, topicFilter, option.getIncarnation())
             .handleAsync((result, e) -> {
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3PersistentSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3PersistentSessionHandler.java
index 853127d0..94cda398 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3PersistentSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3PersistentSessionHandler.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v3;
@@ -61,4 +61,26 @@ public final class MQTT3PersistentSessionHandler extends 
MQTTPersistentSessionHa
     protected IMQTTProtocolHelper helper() {
         return helper;
     }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        super.handlerAdded(ctx);
+        memUsage.addAndGet(estBaseMemSize());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        super.channelInactive(ctx);
+        memUsage.addAndGet(-estBaseMemSize());
+    }
+
+    private int estBaseMemSize() {
+        int s = 400; // base size from JOL
+        s += userSessionId.length();
+        s += clientInfo.getSerializedSize();
+        if (willMessage() != null) {
+            s += willMessage().getSerializedSize();
+        }
+        return s;
+    }
 }
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandler.java
index ecb987c9..bbcf1a89 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandler.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v3;
@@ -49,4 +49,26 @@ public final class MQTT3TransientSessionHandler extends 
MQTTTransientSessionHand
     protected IMQTTProtocolHelper helper() {
         return helper;
     }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        super.handlerAdded(ctx);
+        memUsage.addAndGet(estBaseMemSize());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        super.channelInactive(ctx);
+        memUsage.addAndGet(-estBaseMemSize());
+    }
+
+    private int estBaseMemSize() {
+        int s = 368; // base size from JOL
+        s += userSessionId.length();
+        s += clientInfo.getSerializedSize();
+        if (willMessage() != null) {
+            s += willMessage().getSerializedSize();
+        }
+        return s;
+    }
 }
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5PersistentSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5PersistentSessionHandler.java
index 630f5fea..34c77d33 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5PersistentSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5PersistentSessionHandler.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v5;
@@ -78,4 +78,26 @@ public final class MQTT5PersistentSessionHandler extends 
MQTTPersistentSessionHa
             reAuthenticator.onAuth(message);
         }
     }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        super.handlerAdded(ctx);
+        memUsage.addAndGet(estBaseMemSize());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        super.channelInactive(ctx);
+        memUsage.addAndGet(-estBaseMemSize());
+    }
+
+    private int estBaseMemSize() {
+        int s = 408; // base size from JOL
+        s += userSessionId.length();
+        s += clientInfo.getSerializedSize();
+        if (willMessage() != null) {
+            s += willMessage().getSerializedSize();
+        }
+        return s;
+    }
 }
diff --git 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandler.java
 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandler.java
index f29d4786..375f7371 100644
--- 
a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandler.java
+++ 
b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandler.java
@@ -14,7 +14,7 @@
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
- * under the License.    
+ * under the License.
  */
 
 package org.apache.bifromq.mqtt.handler.v5;
@@ -66,4 +66,26 @@ public final class MQTT5TransientSessionHandler extends 
MQTTTransientSessionHand
             reAuthenticator.onAuth(message);
         }
     }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        super.handlerAdded(ctx);
+        memUsage.addAndGet(estBaseMemSize());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        super.channelInactive(ctx);
+        memUsage.addAndGet(-estBaseMemSize());
+    }
+
+    private int estBaseMemSize() {
+        int s = 376; // base size from JOL
+        s += userSessionId.length();
+        s += clientInfo.getSerializedSize();
+        if (willMessage() != null) {
+            s += willMessage().getSerializedSize();
+        }
+        return s;
+    }
 }

Reply via email to