Copilot commented on code in PR #1152:
URL: https://github.com/apache/pekko-connectors/pull/1152#discussion_r2777253364
##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -978,7 +981,11 @@ object MqttCodec {
: Vector[(Either[DecodeError, String], ControlPacketFlags)] =
if (remainingLen > 0) {
val packetLenAtTopicFilter = v.len
- val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte
& 0xFF))
+ // Pekko QoS constants have been defined as (0, 2, 4), which for
most MQTT messages correctly maps to bits 1 and 2.
+ // However for the MQTT Subcribe message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the left
+ // after receiving the message from the broker. See also:
+ //
https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
+ val topicFilter = (v.decodeString(), ControlPacketFlags((v.getByte
<< 1) & 0xFF))
Review Comment:
`decodeSubscribe` shifts the raw subscribe-options byte left by 1 and then
masks with `0xFF`. If bit 7 is set in the incoming options byte (reserved per
MQTT 3.1.1), the left shift moves it out of the low 8 bits and the mask drops
it, allowing an invalid options byte (e.g. 0x80) to be treated as QoS 0.
Validate reserved bits (bits 2-7) on the original byte before
shifting/extracting QoS.
```suggestion
val topic = v.decodeString()
val subscribeOptions = v.getByte & 0xFF
// Per MQTT 3.1.1, bits 2-7 of the Subscribe options byte are
reserved and must be zero.
// If any reserved bits are set, treat the flags as QoSReserved
so that the existing validation rejects the message.
val controlPacketFlags =
if ((subscribeOptions & 0xFC) != 0)
ControlPacketFlags.QoSReserved
else ControlPacketFlags((subscribeOptions << 1) & 0xFF)
val topicFilter = (topic, controlPacketFlags)
```
##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -978,7 +981,11 @@ object MqttCodec {
: Vector[(Either[DecodeError, String], ControlPacketFlags)] =
if (remainingLen > 0) {
val packetLenAtTopicFilter = v.len
- val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte
& 0xFF))
+ // Pekko QoS constants have been defined as (0, 2, 4), which for
most MQTT messages correctly maps to bits 1 and 2.
+ // However for the MQTT Subcribe message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the left
+ // after receiving the message from the broker. See also:
Review Comment:
The comment says "after receiving the message from the broker", but
`decodeSubscribe` can be used in server mode (where SUBSCRIBE is received from
a client). Consider rephrasing to be direction-agnostic (e.g., "when decoding a
SUBSCRIBE packet").
```suggestion
// However for the MQTT SUBSCRIBE message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the left
// when decoding a SUBSCRIBE packet. See also:
```
##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -712,7 +711,11 @@ object MqttCodec {
v.topicFilters.foreach {
case (topicFilter, topicFilterFlags) =>
topicFilter.encode(packetBsb)
- packetBsb.putByte(topicFilterFlags.underlying.toByte)
+ // Pekko QoS constants have been defined as (0, 2, 4), which for
most MQTT messages correctly maps to bits 1 and 2.
+ // However for the MQTT Subcribe message, the QoS is encoded in bits
1 and 0. So here we need to shift to the right
Review Comment:
The comment mentions "MQTT Subcribe"; this is a typo and should be "MQTT
Subscribe".
```suggestion
// However for the MQTT Subscribe message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the right
```
##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -712,7 +711,11 @@ object MqttCodec {
v.topicFilters.foreach {
case (topicFilter, topicFilterFlags) =>
topicFilter.encode(packetBsb)
- packetBsb.putByte(topicFilterFlags.underlying.toByte)
+ // Pekko QoS constants have been defined as (0, 2, 4), which for
most MQTT messages correctly maps to bits 1 and 2.
+ // However for the MQTT Subcribe message, the QoS is encoded in bits
1 and 0. So here we need to shift to the right
+ // before sending the message to the broker. See also:
+ //
https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
+ packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)
Review Comment:
When encoding SUBSCRIBE topic filter options, consider masking/validating
the value so only the QoS bits (1-0) are sent and reserved bits remain 0. As
written, passing any `ControlPacketFlags` with non-QoS bits set could produce a
protocol-invalid SUBSCRIBE options byte after the shift.
```suggestion
// However for the MQTT Subscribe message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the right
// before sending the message to the broker. See also:
//
https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
// Mask to only allow QoS bits (1 and 2) before shifting so
reserved bits remain 0 in the SUBSCRIBE options byte.
val qosBits = topicFilterFlags.underlying & 0x06
val subscribeOptions = (qosBits >> 1) & 0x03
packetBsb.putByte(subscribeOptions.toByte)
```
##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -978,7 +981,11 @@ object MqttCodec {
: Vector[(Either[DecodeError, String], ControlPacketFlags)] =
if (remainingLen > 0) {
val packetLenAtTopicFilter = v.len
- val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte
& 0xFF))
+ // Pekko QoS constants have been defined as (0, 2, 4), which for
most MQTT messages correctly maps to bits 1 and 2.
+ // However for the MQTT Subcribe message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the left
Review Comment:
The comment mentions "MQTT Subcribe"; this is a typo and should be "MQTT
Subscribe".
```suggestion
// However for the MQTT Subscribe message, the QoS is encoded in
bits 1 and 0. So here we need to shift to the left
```
##########
mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:
##########
@@ -819,7 +822,7 @@ object MqttCodec {
v.decodeSubAck(l)
case (ControlPacketType.UNSUBSCRIBE,
ControlPacketFlags.ReservedUnsubscribe) =>
v.decodeUnsubscribe(l)
- case (ControlPacketType.UNSUBACK,
ControlPacketFlags.ReservedUnsubAck) =>
+ case (ControlPacketType.UNSUBACK,
ControlPacketFlags.ReservedGeneral) =>
v.decodeUnsubAck()
Review Comment:
Current codec tests largely assert encode/decode round-trips, which would
still pass even if both sides made the same (spec-incorrect) transformation.
Consider adding assertions against the on-wire bytes for (1) SUBSCRIBE topic
filter options QoS bits and reserved bits and (2) UNSUBACK fixed-header flags
being 0, plus a decode test that rejects SUBSCRIBE option bytes with reserved
bits (including bit 7) set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]