[ https://issues.apache.org/jira/browse/ARTEMIS-966?focusedWorklogId=877562&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-877562 ]
ASF GitHub Bot logged work on ARTEMIS-966: ------------------------------------------ Author: ASF GitHub Bot Created on: 22/Aug/23 16:40 Start Date: 22/Aug/23 16:40 Worklog Time Spent: 10m Work Description: gemmellr commented on code in PR #4583: URL: https://github.com/apache/activemq-artemis/pull/4583#discussion_r1301861592 ########## artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java: ########## @@ -79,20 +78,31 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ private final MQTTRoutingHandler routingHandler; + private MQTTSessionStateManager sessionStateManager; + MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, - List<BaseInterceptor> outgoingInterceptors) { + List<BaseInterceptor> outgoingInterceptors) throws Exception { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); server.getManagementService().addNotificationListener(this); routingHandler = new MQTTRoutingHandler(server); + sessionStateManager = MQTTSessionStateManager.getInstance(server); + server.registerActivateCallback(new CleaningActivateCallback() { + @Override + public void deActivate() { + MQTTSessionStateManager.removeInstance(server); + sessionStateManager = null; + } + }); } public int getDefaultMqttSessionExpiryInterval() { return defaultMqttSessionExpiryInterval; } public MQTTProtocolManager setDefaultMqttSessionExpiryInterval(int sessionExpiryInterval) { + System.out.println("setDefaultMqttSessionExpiryInterval: " + sessionExpiryInterval); Review Comment: Looks like a leftover...delete or use logging? ########## artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java: ########## @@ -91,8 +98,50 @@ public class MQTTSessionState { private Map<String, Integer> serverTopicAliases; - public MQTTSessionState(String clientId) { + public MQTTSessionState(String clientId, MQTTSessionStateManager stateManager) { this.clientId = clientId; + this.stateManager = stateManager; + } + + /** + * This constructor deserializes session data from a message. The format is as follows. + * + * - byte: version + * - int: subscription count + * + * There may be 0 or more subscriptions. The subscription format is as follows. + * + * - String: topic name + * - int: QoS + * - boolean: no-local + * - boolean: retain as published + * - int: retain handling + * - int (nullable): subscription identifier Review Comment: Curious, what is the case when a durable subscription wouldnt have an id? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java: ########## @@ -92,7 +92,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase { public static Collection<Object[]> getParams() { return Arrays.asList(new Object[][] { {TCP}, - {WS} +// {WS} Review Comment: Leftover? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java: ########## @@ -82,6 +83,69 @@ public void testTimestamp() throws Exception { context.close(); } + @Test(timeout = DEFAULT_TIMEOUT) + public void testResumeSubscriptionsAfterRestart() throws Exception { + final int SUBSCRIPTION_COUNT = 100; + List<String> topicNames = new ArrayList<>(SUBSCRIPTION_COUNT); + for (int i = 0; i < SUBSCRIPTION_COUNT; i++) { + topicNames.add(RandomUtil.randomString()); Review Comment: trace logs etc might be easier to read if just using simple 'getTestName + i' names. ########## artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java: ########## @@ -372,7 +374,7 @@ public static void logMessage(MQTTSessionState state, MqttMessage message, boole break; } - logger.trace(log.toString()); + logger.info(log.toString()); Review Comment: Seems a bit spammy, leftover debug change? EDIT Also the whole method is wrapped in an 'isTraceEnabled' gate. ########## artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionStateManager.java: ########## @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTSessionStateManager { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ActiveMQServer server; + private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<>(); + private final Queue sessionStore; + private static Map<Integer, MQTTSessionStateManager> INSTANCES = new HashMap<>(); + + /* + * Even though there may be multiple instances of MQTTProtocolManager (e.g. for MQTT on different ports) we only want + * one instance of MQTTSessionStateManager per-broker with the understanding that there can be multiple brokers in + * the same JVM. + */ + public static synchronized MQTTSessionStateManager getInstance(ActiveMQServer server) throws Exception { + MQTTSessionStateManager instance = INSTANCES.get(System.identityHashCode(server)); + if (instance == null) { + instance = new MQTTSessionStateManager(server); + INSTANCES.put(System.identityHashCode(server), instance); + } + + return instance; + } + + public static synchronized void removeInstance(ActiveMQServer server) { + INSTANCES.remove(System.identityHashCode(server)); + } + + private MQTTSessionStateManager(ActiveMQServer server) throws Exception { + this.server = server; + sessionStore = server.createQueue(new QueueConfiguration(MQTTUtil.MQTT_SESSION_STORE).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setDurable(true).setInternal(true).setAutoCreateAddress(true), true); + + // load session data from queue + try (LinkedListIterator<MessageReference> iterator = sessionStore.browserIterator()) { + try { + while (iterator.hasNext()) { + MessageReference ref = iterator.next(); + String clientId = ref.getMessage().getStringProperty(Message.HDR_LAST_VALUE_NAME); + MQTTSessionState sessionState = new MQTTSessionState((CoreMessage) ref.getMessage(), this); + sessionStates.put(clientId, sessionState); + } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing + } + } + } + + public void scanSessions() { + List<String> toRemove = new ArrayList(); + for (Map.Entry<String, MQTTSessionState> entry : sessionStates.entrySet()) { + MQTTSessionState state = entry.getValue(); + logger.debug("Inspecting session: {}", state); + int sessionExpiryInterval = state.getClientSessionExpiryInterval(); + if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) { + toRemove.add(entry.getKey()); + } + if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) { + state.getSession().sendWillMessage(); + } + } + + for (String key : toRemove) { + try { + MQTTSessionState state = removeSessionState(key); + if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) { + state.getSession().sendWillMessage(); + } + } catch (Exception e) { + // TODO: make this a real error message + e.printStackTrace(); Review Comment: still TODO? Issue Time Tracking ------------------- Worklog Id: (was: 877562) Time Spent: 20m (was: 10m) > MQTT Session States do not survive a reboot > ------------------------------------------- > > Key: ARTEMIS-966 > URL: https://issues.apache.org/jira/browse/ARTEMIS-966 > Project: ActiveMQ Artemis > Issue Type: Improvement > Components: MQTT > Reporter: Martyn Taylor > Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)