Copilot commented on code in PR #1152:
URL: https://github.com/apache/pekko-connectors/pull/1152#discussion_r2777253364


##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -978,7 +981,11 @@ object MqttCodec {
             : Vector[(Either[DecodeError, String], ControlPacketFlags)] =
           if (remainingLen > 0) {
             val packetLenAtTopicFilter = v.len
-            val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte 
& 0xFF))
+            // Pekko QoS constants have been defined as (0, 2, 4), which for 
most MQTT messages correctly maps to bits 1 and 2.
+            // However for the MQTT Subcribe message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the left
+            // after receiving the message from the broker. See also:
+            // 
https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
+            val topicFilter = (v.decodeString(), ControlPacketFlags((v.getByte 
<< 1) & 0xFF))

Review Comment:
   `decodeSubscribe` shifts the raw subscribe-options byte left by 1 and then 
masks with `0xFF`. If bit 7 is set in the incoming options byte (reserved per 
MQTT 3.1.1), the left shift moves it out of the low 8 bits and the mask drops 
it, allowing an invalid options byte (e.g. 0x80) to be treated as QoS 0. 
Validate reserved bits (bits 2-7) on the original byte before 
shifting/extracting QoS.
   ```suggestion
               val topic = v.decodeString()
               val subscribeOptions = v.getByte & 0xFF
               // Per MQTT 3.1.1, bits 2-7 of the Subscribe options byte are 
reserved and must be zero.
               // If any reserved bits are set, treat the flags as QoSReserved 
so that the existing validation rejects the message.
               val controlPacketFlags =
                 if ((subscribeOptions & 0xFC) != 0) 
ControlPacketFlags.QoSReserved
                 else ControlPacketFlags((subscribeOptions << 1) & 0xFF)
               val topicFilter = (topic, controlPacketFlags)
   ```



##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -978,7 +981,11 @@ object MqttCodec {
             : Vector[(Either[DecodeError, String], ControlPacketFlags)] =
           if (remainingLen > 0) {
             val packetLenAtTopicFilter = v.len
-            val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte 
& 0xFF))
+            // Pekko QoS constants have been defined as (0, 2, 4), which for 
most MQTT messages correctly maps to bits 1 and 2.
+            // However for the MQTT Subcribe message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the left
+            // after receiving the message from the broker. See also:

Review Comment:
   The comment says "after receiving the message from the broker", but 
`decodeSubscribe` can be used in server mode (where SUBSCRIBE is received from 
a client). Consider rephrasing to be direction-agnostic (e.g., "when decoding a 
SUBSCRIBE packet").
   ```suggestion
               // However for the MQTT SUBSCRIBE message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the left
               // when decoding a SUBSCRIBE packet. See also:
   ```



##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -712,7 +711,11 @@ object MqttCodec {
       v.topicFilters.foreach {
         case (topicFilter, topicFilterFlags) =>
           topicFilter.encode(packetBsb)
-          packetBsb.putByte(topicFilterFlags.underlying.toByte)
+          // Pekko QoS constants have been defined as (0, 2, 4), which for 
most MQTT messages correctly maps to bits 1 and 2.
+          // However for the MQTT Subcribe message, the QoS is encoded in bits 
1 and 0. So here we need to shift to the right

Review Comment:
   The comment mentions "MQTT Subcribe"; this is a typo and should be "MQTT 
Subscribe".
   ```suggestion
             // However for the MQTT Subscribe message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the right
   ```



##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -712,7 +711,11 @@ object MqttCodec {
       v.topicFilters.foreach {
         case (topicFilter, topicFilterFlags) =>
           topicFilter.encode(packetBsb)
-          packetBsb.putByte(topicFilterFlags.underlying.toByte)
+          // Pekko QoS constants have been defined as (0, 2, 4), which for 
most MQTT messages correctly maps to bits 1 and 2.
+          // However for the MQTT Subcribe message, the QoS is encoded in bits 
1 and 0. So here we need to shift to the right
+          // before sending the message to the broker. See also:
+          // 
https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
+          packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)

Review Comment:
   When encoding SUBSCRIBE topic filter options, consider masking/validating 
the value so only the QoS bits (1-0) are sent and reserved bits remain 0. As 
written, passing any `ControlPacketFlags` with non-QoS bits set could produce a 
protocol-invalid SUBSCRIBE options byte after the shift.
   ```suggestion
             // However for the MQTT Subscribe message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the right
             // before sending the message to the broker. See also:
             // 
https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
             // Mask to only allow QoS bits (1 and 2) before shifting so 
reserved bits remain 0 in the SUBSCRIBE options byte.
             val qosBits = topicFilterFlags.underlying & 0x06
             val subscribeOptions = (qosBits >> 1) & 0x03
             packetBsb.putByte(subscribeOptions.toByte)
   ```



##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -978,7 +981,11 @@ object MqttCodec {
             : Vector[(Either[DecodeError, String], ControlPacketFlags)] =
           if (remainingLen > 0) {
             val packetLenAtTopicFilter = v.len
-            val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte 
& 0xFF))
+            // Pekko QoS constants have been defined as (0, 2, 4), which for 
most MQTT messages correctly maps to bits 1 and 2.
+            // However for the MQTT Subcribe message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the left

Review Comment:
   The comment mentions "MQTT Subcribe"; this is a typo and should be "MQTT 
Subscribe".
   ```suggestion
               // However for the MQTT Subscribe message, the QoS is encoded in 
bits 1 and 0. So here we need to shift to the left
   ```



##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -819,7 +822,7 @@ object MqttCodec {
                 v.decodeSubAck(l)
               case (ControlPacketType.UNSUBSCRIBE, 
ControlPacketFlags.ReservedUnsubscribe) =>
                 v.decodeUnsubscribe(l)
-              case (ControlPacketType.UNSUBACK, 
ControlPacketFlags.ReservedUnsubAck) =>
+              case (ControlPacketType.UNSUBACK, 
ControlPacketFlags.ReservedGeneral) =>
                 v.decodeUnsubAck()

Review Comment:
   Current codec tests largely assert encode/decode round-trips, which would 
still pass even if both sides made the same (spec-incorrect) transformation. 
Consider adding assertions against the on-wire bytes for (1) SUBSCRIBE topic 
filter options QoS bits and reserved bits and (2) UNSUBACK fixed-header flags 
being 0, plus a decode test that rejects SUBSCRIBE option bytes with reserved 
bits (including bit 7) set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to