[ https://issues.apache.org/jira/browse/ARTEMIS-3638?focusedWorklogId=712849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-712849 ]
ASF GitHub Bot logged work on ARTEMIS-3638: ------------------------------------------- Author: ASF GitHub Bot Created on: 21/Jan/22 13:21 Start Date: 21/Jan/22 13:21 Worklog Time Spent: 10m Work Description: gtully commented on a change in pull request #3907: URL: https://github.com/apache/activemq-artemis/pull/3907#discussion_r789626358 ########## File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java ########## @@ -159,89 +176,180 @@ public void act(MqttMessage message) { break; case UNSUBACK: case SUBACK: + case PINGREQ: // These are actually handled by the Netty thread directly so this packet should never make it here case PINGRESP: case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message. default: disconnect(true); } } catch (Exception e) { - log.warn("Error processing Control Packet, Disconnecting Client", e); + MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e); + if (session.is5()) { + sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR); + } disconnect(true); } finally { ReferenceCountUtil.release(message); } } - /** - * Called during connection. + /* + * Scaffolding for "enhanced authentication" implementation. + * + * See: + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901217 + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901256 + * + * Tests for this are in: + * org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.AuthTests + * org.apache.activemq.artemis.tests.integration.mqtt5.spec.EnhancedAuthenticationTests * - * @param connect + * This should integrate somehow with our existing SASL implementation for challenge/response conversations. */ + void handleAuth(MqttMessage auth) throws Exception { + byte[] authenticationData = MQTTUtil.getProperty(byte[].class, ((MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader()).properties(), AUTHENTICATION_DATA); + String authenticationMethod = MQTTUtil.getProperty(String.class, ((MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader()).properties(), AUTHENTICATION_METHOD); + + MqttReasonCodeAndPropertiesVariableHeader header = (MqttReasonCodeAndPropertiesVariableHeader) auth.variableHeader(); + if (header.reasonCode() == MQTTReasonCodes.RE_AUTHENTICATE) { + + } else if (header.reasonCode() == MQTTReasonCodes.CONTINUE_AUTHENTICATION) { + + } else if (header.reasonCode() == MQTTReasonCodes.SUCCESS) { + + } + } + void handleConnect(MqttConnectMessage connect) throws Exception { - final String username = connect.payload().userName(); - final String password = connect.payload().passwordInBytes() == null ? null : new String( connect.payload().passwordInBytes(), CharsetUtil.UTF_8); - final String validatedUser = server.validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain()); - if (connection.getTransportConnection().getRedirectTo() == null || - !protocolManager.getRedirectHandler().redirect(connection, session, connect)) { - connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L; - - String clientId = connect.payload().clientIdentifier(); - session.getConnectionManager().connect(clientId, username, password, connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession(), validatedUser); + if (connection.getTransportConnection().getRedirectTo() == null || !protocolManager.getRedirectHandler().redirect(connection, session, connect)) { + /* [MQTT-3.1.2-2] Reject unsupported clients. */ + int packetVersion = connect.variableHeader().version(); + if (packetVersion != MqttVersion.MQTT_3_1.protocolLevel() && + packetVersion != MqttVersion.MQTT_3_1_1.protocolLevel() && + packetVersion != MqttVersion.MQTT_5.protocolLevel()) { + + { + /* + * The return code is different here for 3.1.1 vs 5, but since the broker supports *both* it's not clear + * which one should be chosen. Perhaps an acceptor property could be set to decide. Leaving the existing Review comment: this does not need to be configured, but if packedVersion > 3 I would use the v5 reason code ########## File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ########## @@ -284,11 +358,110 @@ private void sendServerMessage(int messageId, ICoreMessage message, int delivery payload.writeBytes(bodyBuffer.byteBuf()); break; } - session.getProtocolHandler().send(messageId, address, qos, isRetain, payload, deliveryCount); + + // [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages. + boolean redelivery = qos == 0 ? false : (deliveryCount > 1); + + boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY); + MqttProperties mqttProperties = getPublishProperties(message); + + if (session.is5()) { + if (session.getState().getSubscription(message.getAddress()) != null && !session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished()) { + isRetain = false; + } + + // [MQTT-3.8.3-3] remove property used for no-local implementation + message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); + + if (session.getState().getClientTopicAliasMaximum() != null) { + Integer alias = session.getState().getServerTopicAlias(address); + if (alias == null) { + alias = session.getState().addServerTopicAlias(address); + if (alias != null) { + mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias)); + } + } else { + mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias)); + address = ""; + } + } + } + + int remainingLength = MQTTUtil.calculateRemainingLength(address, mqttProperties, payload); + MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength); + MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(address, messageId, mqttProperties); + MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload); + + if (session.is5()) { + int size = MQTTUtil.calculateMessageSize(publish); Review comment: maybe calculate size if maxSize != 0 to avoid the computation if unlimited ########## File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ########## @@ -185,45 +243,83 @@ public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) { @Override public void addChannelHandlers(ChannelPipeline pipeline) { pipeline.addLast(MqttEncoder.INSTANCE); - pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE)); + /* + * If we use the value from getMaximumPacketSize() here anytime a client sends a packet that's too large it + * will receive a DISCONNECT with a reason code of 0x81 instead of 0x95 like it should according to the spec. Review comment: how bad is that? A 0x81 malformed is not accurate but it is the disconnect that is important? Doing a second size check or a size check higher up may be an unnecessary expense if it can be computed and rejected at the netty layer, even if the reason code is less accurate it may be a good tradeoff to make. I don't see an mqtt app making some decision based on the reason code, but maybe. Or maybe we can fix netty to use 0x95 for v5 ########## File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ########## @@ -185,45 +243,83 @@ public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) { @Override public void addChannelHandlers(ChannelPipeline pipeline) { pipeline.addLast(MqttEncoder.INSTANCE); - pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE)); + /* + * If we use the value from getMaximumPacketSize() here anytime a client sends a packet that's too large it + * will receive a DISCONNECT with a reason code of 0x81 instead of 0x95 like it should according to the spec. + * See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086: + * + * If a Server receives a packet whose size exceeds this limit, this is a Protocol Error, the Server uses + * DISCONNECT with Reason Code 0x95 (Packet too large)... + * + * Therefore we check manually in org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolHandler.handlePublish + */ + pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_PACKET_SIZE)); pipeline.addLast(new MQTTProtocolHandler(server, this)); } /** - * The protocol handler passes us an 8 byte long array from the transport. We sniff these first 8 bytes to see - * if they match the first 8 bytes from MQTT Connect packet. In many other protocols the protocol name is the first - * thing sent on the wire. However, in MQTT the protocol name doesn't come until later on in the CONNECT packet. - * - * In order to fully identify MQTT protocol via protocol name, we need up to 12 bytes. However, we can use other - * information from the connect packet to infer that the MQTT protocol is being used. This is enough to identify MQTT - * and add the Netty codec in the pipeline. The Netty codec takes care of things from here. - * - * MQTT CONNECT PACKET: See MQTT 3.1.1 Spec for more info. - * - * Byte 1: Fixed Header Packet Type. 0b0001000 (16) = MQTT Connect - * Byte 2-[N]: Remaining length of the Connect Packet (encoded with 1-4 bytes). - * - * The next set of bytes represents the UTF8 encoded string MQTT (MQTT 3.1.1) or MQIsdp (MQTT 3.1) - * Byte N: UTF8 MSB must be 0 - * Byte N+1: UTF8 LSB must be (4(MQTT) or 6(MQIsdp)) - * Byte N+1: M (first char from the protocol name). - * - * Max no bytes used in the sequence = 8. + * Relevant portions of the specs we support: + * MQTT 3.1 - https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect + * MQTT 3.1.1 - http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028 + * MQTT 5 - https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033 */ @Override public boolean isProtocol(byte[] array) { ByteBuf buf = Unpooled.wrappedBuffer(array); - if (!(buf.readByte() == 16 && validateRemainingLength(buf) && buf.readByte() == (byte) 0)) return false; + // Parse "fixed header" + if (!(readByte(buf) == 16 && validateRemainingLength(buf) && readByte(buf) == (byte) 0)) { + return false; + } + + // Start parsing the Protocol Name + byte b = readByte(buf); // LSB + + // MQTT 3.1.1 & 5 + if (b == 4 && + (readByte(buf) != 77 || // M + readByte(buf) != 81 || // Q + readByte(buf) != 84 || // T + readByte(buf) != 84)) { // T + return false; + } + + // MQTT 3.1 + if (b == 6 && + (readByte(buf) != 77 || // M + readByte(buf) != 81 || // Q + readByte(buf) != 73 || // I + readByte(buf) != 115 || // s + readByte(buf) != 100 || // d + readByte(buf) != 112)) { // p + return false; + } + + // Protocol Version + b = readByte(buf); Review comment: How does this play with the protocol version checks? if we don't support the version we won't see it as mqtt, so we can never do version the checks in handleConnection. Or am i missing something. https://github.com/apache/activemq-artemis/pull/3907/files#diff-49bf0ed78bad6e95bceeea1d35d0c2216c0c14a3fdbf23cca0c14adf8736d818L180 ########## File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java ########## @@ -212,4 +234,42 @@ public CoreMessageObjectPools getCoreMessageObjectPools() { return coreMessageObjectPools; } + public boolean is5() { + return five; + } + + public void set5(boolean five) { Review comment: small point, but I am not sure on five and boolean, it may be better to keep it to int and int > 3, I imagine there will be mqtt 6 etc... and they will build on each other. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 712849) Time Spent: 40m (was: 0.5h) > Support MQTT 5 > -------------- > > Key: ARTEMIS-3638 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3638 > Project: ActiveMQ Artemis > Issue Type: New Feature > Reporter: Justin Bertram > Assignee: Justin Bertram > Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.1#820001)