[ 
https://issues.apache.org/jira/browse/ARTEMIS-3638?focusedWorklogId=712849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-712849
 ]

ASF GitHub Bot logged work on ARTEMIS-3638:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jan/22 13:21
            Start Date: 21/Jan/22 13:21
    Worklog Time Spent: 10m 
      Work Description: gtully commented on a change in pull request #3907:
URL: https://github.com/apache/activemq-artemis/pull/3907#discussion_r789626358



##########
File path: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
##########
@@ -159,89 +176,180 @@ public void act(MqttMessage message) {
                break;
             case UNSUBACK:
             case SUBACK:
+            case PINGREQ: // These are actually handled by the Netty thread 
directly so this packet should never make it here
             case PINGRESP:
             case CONNACK: // The server does not instantiate connections 
therefore any CONNACK received over a connection is an invalid control message.
             default:
                disconnect(true);
          }
       } catch (Exception e) {
-         log.warn("Error processing Control Packet, Disconnecting Client", e);
+         MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
+         if (session.is5()) {
+            sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
+         }
          disconnect(true);
       } finally {
          ReferenceCountUtil.release(message);
       }
    }
 
-   /**
-    * Called during connection.
+   /*
+    * Scaffolding for "enhanced authentication" implementation.
+    *
+    * See:
+    *   
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901217
+    *   
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901256
+    *
+    * Tests for this are in:
+    *   
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.AuthTests
+    *   
org.apache.activemq.artemis.tests.integration.mqtt5.spec.EnhancedAuthenticationTests
     *
-    * @param connect
+    * This should integrate somehow with our existing SASL implementation for 
challenge/response conversations.
     */
+   void handleAuth(MqttMessage auth) throws Exception {
+      byte[] authenticationData = MQTTUtil.getProperty(byte[].class, 
((MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader()).properties(),
 AUTHENTICATION_DATA);
+      String authenticationMethod = MQTTUtil.getProperty(String.class, 
((MqttReasonCodeAndPropertiesVariableHeader)auth.variableHeader()).properties(),
 AUTHENTICATION_METHOD);
+
+      MqttReasonCodeAndPropertiesVariableHeader header = 
(MqttReasonCodeAndPropertiesVariableHeader) auth.variableHeader();
+      if (header.reasonCode() == MQTTReasonCodes.RE_AUTHENTICATE) {
+
+      } else if (header.reasonCode() == 
MQTTReasonCodes.CONTINUE_AUTHENTICATION) {
+
+      } else if (header.reasonCode() == MQTTReasonCodes.SUCCESS) {
+
+      }
+   }
+
    void handleConnect(MqttConnectMessage connect) throws Exception {
-      final String username = connect.payload().userName();
-      final String password = connect.payload().passwordInBytes() == null ? 
null : new String( connect.payload().passwordInBytes(), CharsetUtil.UTF_8);
-      final String validatedUser = server.validateUser(username, password, 
session.getConnection(), session.getProtocolManager().getSecurityDomain());
-      if (connection.getTransportConnection().getRedirectTo() == null ||
-         !protocolManager.getRedirectHandler().redirect(connection, session, 
connect)) {
-         connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() 
* 1500L;
-
-         String clientId = connect.payload().clientIdentifier();
-         session.getConnectionManager().connect(clientId, username, password, 
connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), 
connect.payload().willTopic(), connect.variableHeader().isWillRetain(), 
connect.variableHeader().willQos(), connect.variableHeader().isCleanSession(), 
validatedUser);
+      if (connection.getTransportConnection().getRedirectTo() == null || 
!protocolManager.getRedirectHandler().redirect(connection, session, connect)) {
+         /* [MQTT-3.1.2-2] Reject unsupported clients. */
+         int packetVersion = connect.variableHeader().version();
+         if (packetVersion != MqttVersion.MQTT_3_1.protocolLevel() &&
+            packetVersion != MqttVersion.MQTT_3_1_1.protocolLevel() &&
+            packetVersion != MqttVersion.MQTT_5.protocolLevel()) {
+
+            {
+               /*
+                * The return code is different here for 3.1.1 vs 5, but since 
the broker supports *both* it's not clear
+                * which one should be chosen. Perhaps an acceptor property 
could be set to decide. Leaving the existing

Review comment:
       this does not need to be configured, but if packedVersion > 3 I would 
use the v5 reason code

##########
File path: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
##########
@@ -284,11 +358,110 @@ private void sendServerMessage(int messageId, 
ICoreMessage message, int delivery
             payload.writeBytes(bodyBuffer.byteBuf());
             break;
       }
-      session.getProtocolHandler().send(messageId, address, qos, isRetain, 
payload, deliveryCount);
+
+      // [MQTT-3.3.1-2] The DUP flag MUST be set to 0 for all QoS 0 messages.
+      boolean redelivery = qos == 0 ? false : (deliveryCount > 1);
+
+      boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
+      MqttProperties mqttProperties = getPublishProperties(message);
+
+      if (session.is5()) {
+         if (session.getState().getSubscription(message.getAddress()) != null 
&& 
!session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished())
 {
+            isRetain = false;
+         }
+
+         // [MQTT-3.8.3-3] remove property used for no-local implementation
+         message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
+
+         if (session.getState().getClientTopicAliasMaximum() != null) {
+            Integer alias = session.getState().getServerTopicAlias(address);
+            if (alias == null) {
+               alias = session.getState().addServerTopicAlias(address);
+               if (alias != null) {
+                  mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
+               }
+            } else {
+               mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
+               address = "";
+            }
+         }
+      }
+
+      int remainingLength = MQTTUtil.calculateRemainingLength(address, 
mqttProperties, payload);
+      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, 
redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength);
+      MqttPublishVariableHeader varHeader = new 
MqttPublishVariableHeader(address, messageId, mqttProperties);
+      MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, 
payload);
+
+      if (session.is5()) {
+         int size = MQTTUtil.calculateMessageSize(publish);

Review comment:
       maybe calculate size if maxSize != 0 to avoid the computation if 
unlimited

##########
File path: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
##########
@@ -185,45 +243,83 @@ public void handleBuffer(RemotingConnection connection, 
ActiveMQBuffer buffer) {
    @Override
    public void addChannelHandlers(ChannelPipeline pipeline) {
       pipeline.addLast(MqttEncoder.INSTANCE);
-      pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
+      /*
+       * If we use the value from getMaximumPacketSize() here anytime a client 
sends a packet that's too large it
+       * will receive a DISCONNECT with a reason code of 0x81 instead of 0x95 
like it should according to the spec.

Review comment:
       how bad is that?  A 0x81 malformed is not accurate but it is the 
disconnect that is important? Doing a second size check or a size check higher 
up may be an unnecessary expense if it can be computed and rejected at the 
netty layer, even if the reason code is less accurate it may be a good tradeoff 
to make. I don't see an mqtt app making some decision based on the reason code, 
but maybe. Or maybe we can fix netty to use 0x95 for v5

##########
File path: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
##########
@@ -185,45 +243,83 @@ public void handleBuffer(RemotingConnection connection, 
ActiveMQBuffer buffer) {
    @Override
    public void addChannelHandlers(ChannelPipeline pipeline) {
       pipeline.addLast(MqttEncoder.INSTANCE);
-      pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
+      /*
+       * If we use the value from getMaximumPacketSize() here anytime a client 
sends a packet that's too large it
+       * will receive a DISCONNECT with a reason code of 0x81 instead of 0x95 
like it should according to the spec.
+       * See 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086:
+       *
+       *   If a Server receives a packet whose size exceeds this limit, this 
is a Protocol Error, the Server uses
+       *   DISCONNECT with Reason Code 0x95 (Packet too large)...
+       *
+       * Therefore we check manually in 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolHandler.handlePublish
+       */
+      pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_PACKET_SIZE));
 
       pipeline.addLast(new MQTTProtocolHandler(server, this));
    }
 
    /**
-    * The protocol handler passes us an 8 byte long array from the transport.  
We sniff these first 8 bytes to see
-    * if they match the first 8 bytes from MQTT Connect packet.  In many other 
protocols the protocol name is the first
-    * thing sent on the wire.  However, in MQTT the protocol name doesn't come 
until later on in the CONNECT packet.
-    *
-    * In order to fully identify MQTT protocol via protocol name, we need up 
to 12 bytes.  However, we can use other
-    * information from the connect packet to infer that the MQTT protocol is 
being used.  This is enough to identify MQTT
-    * and add the Netty codec in the pipeline.  The Netty codec takes care of 
things from here.
-    *
-    * MQTT CONNECT PACKET: See MQTT 3.1.1 Spec for more info.
-    *
-    * Byte 1: Fixed Header Packet Type.  0b0001000 (16) = MQTT Connect
-    * Byte 2-[N]: Remaining length of the Connect Packet (encoded with 1-4 
bytes).
-    *
-    * The next set of bytes represents the UTF8 encoded string MQTT (MQTT 
3.1.1) or MQIsdp (MQTT 3.1)
-    * Byte N: UTF8 MSB must be 0
-    * Byte N+1: UTF8 LSB must be (4(MQTT) or 6(MQIsdp))
-    * Byte N+1: M (first char from the protocol name).
-    *
-    * Max no bytes used in the sequence = 8.
+    * Relevant portions of the specs we support:
+    * MQTT 3.1 - 
https://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
+    * MQTT 3.1.1 - 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028
+    * MQTT 5 - 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033
     */
    @Override
    public boolean isProtocol(byte[] array) {
       ByteBuf buf = Unpooled.wrappedBuffer(array);
 
-      if (!(buf.readByte() == 16 && validateRemainingLength(buf) && 
buf.readByte() == (byte) 0)) return false;
+      // Parse "fixed header"
+      if (!(readByte(buf) == 16 && validateRemainingLength(buf) && 
readByte(buf) == (byte) 0)) {
+         return false;
+      }
+
+      // Start parsing the Protocol Name
+      byte b = readByte(buf); // LSB
+
+      // MQTT 3.1.1 & 5
+      if (b == 4 &&
+         (readByte(buf) != 77 || // M
+         readByte(buf) != 81 ||  // Q
+         readByte(buf) != 84 ||  // T
+         readByte(buf) != 84)) { // T
+         return false;
+      }
+
+      // MQTT 3.1
+      if (b == 6 &&
+         (readByte(buf) != 77 ||  // M
+         readByte(buf) != 81 ||   // Q
+         readByte(buf) != 73 ||   // I
+         readByte(buf) != 115 ||  // s
+         readByte(buf) != 100 ||  // d
+         readByte(buf) != 112)) { // p
+         return false;
+      }
+
+      // Protocol Version
+      b = readByte(buf);

Review comment:
       How does this play with the protocol version checks? if we don't support 
the version we won't see it as mqtt, so we can never do version the checks in 
handleConnection. Or am i missing something.
   
https://github.com/apache/activemq-artemis/pull/3907/files#diff-49bf0ed78bad6e95bceeea1d35d0c2216c0c14a3fdbf23cca0c14adf8736d818L180

##########
File path: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
##########
@@ -212,4 +234,42 @@ public CoreMessageObjectPools getCoreMessageObjectPools() {
       return coreMessageObjectPools;
    }
 
+   public boolean is5() {
+      return five;
+   }
+
+   public void set5(boolean five) {

Review comment:
       small point, but I am not sure on five and boolean, it may be better to 
keep it to int and int > 3, I imagine there will be mqtt 6  etc... and they 
will build on each other. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 712849)
    Time Spent: 40m  (was: 0.5h)

> Support MQTT 5
> --------------
>
>                 Key: ARTEMIS-3638
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3638
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Justin Bertram
>            Assignee: Justin Bertram
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to