[ https://issues.apache.org/jira/browse/ARTEMIS-966?focusedWorklogId=877621&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-877621 ]
ASF GitHub Bot logged work on ARTEMIS-966: ------------------------------------------ Author: ASF GitHub Bot Created on: 22/Aug/23 19:33 Start Date: 22/Aug/23 19:33 Worklog Time Spent: 10m Work Description: jbertram commented on code in PR #4583: URL: https://github.com/apache/activemq-artemis/pull/4583#discussion_r1302104400 ########## artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionStateManager.java: ########## @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTSessionStateManager { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ActiveMQServer server; + private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<>(); + private final Queue sessionStore; + private static Map<Integer, MQTTSessionStateManager> INSTANCES = new HashMap<>(); + + /* + * Even though there may be multiple instances of MQTTProtocolManager (e.g. for MQTT on different ports) we only want + * one instance of MQTTSessionStateManager per-broker with the understanding that there can be multiple brokers in + * the same JVM. + */ + public static synchronized MQTTSessionStateManager getInstance(ActiveMQServer server) throws Exception { + MQTTSessionStateManager instance = INSTANCES.get(System.identityHashCode(server)); + if (instance == null) { + instance = new MQTTSessionStateManager(server); + INSTANCES.put(System.identityHashCode(server), instance); + } + + return instance; + } + + public static synchronized void removeInstance(ActiveMQServer server) { + INSTANCES.remove(System.identityHashCode(server)); + } + + private MQTTSessionStateManager(ActiveMQServer server) throws Exception { + this.server = server; + sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true); + + // load session data from queue + try (LinkedListIterator<MessageReference> iterator = sessionStore.browserIterator()) { + try { + while (iterator.hasNext()) { + MessageReference ref = iterator.next(); + String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME); + MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage(), this); + sessionStates.put(clientId, sessionState); + } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing + } + } + } + + public void scanSessions() { + List<String> toRemove = new ArrayList(); + for (Map.Entry<String, MQTTSessionState> entry : sessionStates.entrySet()) { + MQTTSessionState state = entry.getValue(); + logger.debug("Inspecting session: {}", state); + int sessionExpiryInterval = state.getClientSessionExpiryInterval(); + if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { + toRemove.add(entry.getKey()); + } + if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { + state.getSession().sendWillMessage(); + } + } + + for (String key : toRemove) { + try { + MQTTSessionState state = removeSessionState(key); + if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { + state.getSession().sendWillMessage(); + } + } catch (Exception e) { + // TODO: make this a real error message + e.printStackTrace(); Review Comment: That `TODO` was copied from `MQTTProtocolManager`. It is still to-do, but not related to this PR. Issue Time Tracking ------------------- Worklog Id: (was: 877621) Time Spent: 50m (was: 40m) > MQTT Session States do not survive a reboot > ------------------------------------------- > > Key: ARTEMIS-966 > URL: https://issues.apache.org/jira/browse/ARTEMIS-966 > Project: ActiveMQ Artemis > Issue Type: Improvement > Components: MQTT > Reporter: Martyn Taylor > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)