This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new 30090ecd9 MQTT v5: expose MQTT 5 user properties on MqttMessage 
(#1480) (#1483)
30090ecd9 is described below

commit 30090ecd9d29fac606208da990f96ad288f389fa
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 10 11:02:23 2026 +0100

    MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480) (#1483)
    
    * MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480)
    
    * Add MqttUserProperty model class (Scala/Java API)
    * Add userProperties field to MqttMessage with withUserProperties builder
    * Map incoming Paho user properties to MqttUserProperty on messageArrived
    * Map outgoing MqttUserProperty to Paho MqttProperties on publish
    * Add ACL entries and tests (Scala + Java) for user property round-trip
    
    * fix compile issues in Scala 2.12
    
    * add Java API methods to mqttv5 classes (#1486)
    
    * add Java API methods to mqttv5 classes
    
    * Update model.scala
    
    * Update model.scala
    
    ---------
    
    Co-authored-by: anton-curanz-mw 
<[email protected]>
---
 mqtt/src/test/travis/acl                           |  2 +
 .../connectors/mqttv5/impl/MqttFlowStage.scala     | 17 ++++-
 .../pekko/stream/connectors/mqttv5/model.scala     | 84 +++++++++++++++++++---
 .../src/test/java/docs/javadsl/MqttSourceTest.java | 38 ++++++++++
 .../test/scala/docs/scaladsl/MqttSourceSpec.scala  | 21 ++++++
 5 files changed, 152 insertions(+), 10 deletions(-)

diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl
index 8bf95d38d..104dfb743 100644
--- a/mqtt/src/test/travis/acl
+++ b/mqtt/src/test/travis/acl
@@ -23,6 +23,7 @@ topic v5/coffee/level
 topic v5/source-spec/will
 topic v5/source-spec/manualacks
 topic v5/source-spec/pendingacks
+topic v5/source-spec/user-props
 topic v5/sink-spec/topic1
 topic v5/sink-spec/topic2
 topic v5/sink-spec/topic3
@@ -32,6 +33,7 @@ topic v5/source-test/topic2
 topic v5/source-test/will
 topic v5/source-test/manualacks
 topic v5/source-test/pendingacks
+topic v5/source-test/user-props
 topic v5/flow-spec/topic-ack
 topic v5/flow-test/topic-ack
 topic v5/typed-flow-spec/topic1
diff --git 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
index f60baa4a6..4ebb5f5bc 100644
--- 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
+++ 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
@@ -31,11 +31,13 @@ import org.apache.pekko.stream._
 import org.apache.pekko.stream.connectors.mqttv5.AuthSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
 import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
 import org.apache.pekko.stream.stage._
 import org.apache.pekko.util.ByteString
+import org.apache.pekko.util.ccompat.JavaConverters._
 import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions
 import org.eclipse.paho.mqttv5.client.IMqttAsyncClient
 import org.eclipse.paho.mqttv5.client.IMqttToken
@@ -46,6 +48,7 @@ import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse
 import org.eclipse.paho.mqttv5.common.MqttException
 import org.eclipse.paho.mqttv5.common.packet.MqttProperties
 import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode
+import org.eclipse.paho.mqttv5.common.packet.UserProperty
 import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage }
 
 /**
@@ -235,8 +238,14 @@ abstract class MqttFlowStageLogic[I](
     new MqttCallback {
       override def messageArrived(topic: String, pahoMessage: 
PahoMqttMessage): Unit = {
         backpressurePahoClient.acquire()
+        val userProps =
+          Option(pahoMessage.getProperties)
+            .map(_.getUserProperties.asScala.map(p => 
MqttUserProperty(p.getKey, p.getValue)).toList)
+            .getOrElse(Nil)
         val message = new MqttMessageWithAck {
-          override val message: MqttMessage = MqttMessage(topic, 
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+          override val message: MqttMessage =
+            MqttMessage(topic, 
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+              .withUserProperties(userProps)
 
           override def ack(): Future[Done] = {
             val promise = Promise[Done]()
@@ -404,6 +413,12 @@ abstract class MqttFlowStageLogic[I](
     pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value)
     pahoMsg.setRetained(msg.retained)
 
+    if (msg.userProperties.nonEmpty) {
+      val pahoProps = new MqttProperties()
+      pahoProps.setUserProperties(msg.userProperties.map(p => new 
UserProperty(p.key, p.value)).toList.asJava)
+      pahoMsg.setProperties(pahoProps)
+    }
+
     mqttClient.publish(
       msg.topic,
       pahoMsg,
diff --git 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
index 106488b16..d04d2c77d 100644
--- 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
+++ 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -14,12 +14,46 @@
 package org.apache.pekko.stream.connectors.mqttv5
 
 import org.apache.pekko
+import pekko.util.ccompat.JavaConverters._
+import pekko.util.OptionConverters._
+
+import scala.collection.immutable
+
+final class MqttUserProperty private (val key: String, val value: String) {
+
+  /** Java API */
+  def getKey(): String = key
+
+  /** Java API */
+  def getValue(): String = value
+
+  override def toString = s"MqttUserProperty(key=$key,value=$value)"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: MqttUserProperty =>
+      java.util.Objects.equals(this.key, that.key) &&
+      java.util.Objects.equals(this.value, that.value)
+    case _ => false
+  }
+
+  override def hashCode(): Int = java.util.Objects.hash(key, value)
+}
+
+object MqttUserProperty {
+
+  /** Scala API */
+  def apply(key: String, value: String): MqttUserProperty = new 
MqttUserProperty(key, value)
+
+  /** Java API */
+  def create(key: String, value: String): MqttUserProperty = new 
MqttUserProperty(key, value)
+}
 
 final class MqttMessage private (
     val topic: String,
     val payload: org.apache.pekko.util.ByteString,
     val qos: Option[MqttQoS],
-    val retained: Boolean
+    val retained: Boolean,
+    val userProperties: Seq[MqttUserProperty]
 ) {
 
   def withTopic(value: String): MqttMessage = copy(topic = value)
@@ -28,26 +62,56 @@ final class MqttMessage private (
   def withQos(value: MqttQoS): MqttMessage = copy(qos = Option(value))
   def withRetained(value: Boolean): MqttMessage = if (retained == value) this 
else copy(retained = value)
 
-  private def copy(topic: String = topic,
+  /** Scala API */
+  def withUserProperties(value: immutable.Seq[MqttUserProperty]): MqttMessage =
+    copy(userProperties = value)
+
+  /** Java API */
+  def withUserProperties(value: java.util.List[MqttUserProperty]): MqttMessage 
=
+    copy(userProperties = value.asScala.toSeq)
+
+  /**
+   * Java API. Returns the user properties.
+   * Modifying the returned list will not change the user properties of this 
message.
+   */
+  def getUserProperties(): java.util.List[MqttUserProperty] = 
userProperties.asJava
+
+  /** Java API */
+  def isRetained(): Boolean = retained
+
+  /** Java API */
+  def getQoS(): java.util.Optional[MqttQoS] = qos.toJava
+
+  /** Java API */
+  def getPayload(): org.apache.pekko.util.ByteString = payload
+
+  /** Java API */
+  def getTopic(): String = topic
+
+  private def copy(
+      topic: String = topic,
       payload: pekko.util.ByteString = payload,
       qos: Option[MqttQoS] = qos,
-      retained: Boolean = retained): MqttMessage =
-    new MqttMessage(topic = topic, payload = payload, qos = qos, retained = 
retained)
+      retained: Boolean = retained,
+      userProperties: Seq[MqttUserProperty] = userProperties): MqttMessage =
+    new MqttMessage(topic = topic, payload = payload, qos = qos, retained = 
retained, userProperties = userProperties)
 
   override def toString =
-    
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained)"""
+    
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained,userProperties=${userProperties.mkString(
+        "[", ", ", "]")})"""
 
   override def equals(other: Any): Boolean = other match {
     case that: MqttMessage =>
       java.util.Objects.equals(this.topic, that.topic) &&
       java.util.Objects.equals(this.payload, that.payload) &&
       java.util.Objects.equals(this.qos, that.qos) &&
-      java.util.Objects.equals(this.retained, that.retained)
+      java.util.Objects.equals(this.retained, that.retained) &&
+      java.util.Objects.equals(this.userProperties, that.userProperties)
     case _ => false
   }
 
   override def hashCode(): Int =
-    java.util.Objects.hash(topic, payload, qos, Boolean.box(retained))
+    java.util.Objects.hash(topic, payload, qos, Boolean.box(retained), 
userProperties)
 }
 
 object MqttMessage {
@@ -59,7 +123,8 @@ object MqttMessage {
     topic,
     payload,
     qos = None,
-    retained = false)
+    retained = false,
+    userProperties = Seq.empty)
 
   /** Java API */
   def create(
@@ -68,5 +133,6 @@ object MqttMessage {
     topic,
     payload,
     qos = None,
-    retained = false)
+    retained = false,
+    userProperties = Seq.empty)
 }
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java 
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index 3d769ba31..57d014fe3 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -23,6 +23,7 @@ import 
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings;
 import org.apache.pekko.stream.connectors.mqttv5.MqttMessage;
 import org.apache.pekko.stream.connectors.mqttv5.MqttQoS;
 import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty;
 import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck;
 import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSink;
 import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSource;
@@ -338,4 +339,41 @@ public class MqttSourceTest {
                 MqttMessage.create(willTopic, ByteString.fromString("ohi")),
                 elem.toCompletableFuture().get(3, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void receiveUserProperties() throws Exception {
+        final String topic = "v5/source-test/user-props";
+        MqttConnectionSettings connectionSettings =
+                MqttConnectionSettings.create("tcp://localhost:1883", 
"test-java-user-props", new MemoryPersistence());
+
+        Source<MqttMessage, CompletionStage<Done>> source =
+                MqttSource.atMostOnce(
+                        
connectionSettings.withClientId("source-test/user-props-source"),
+                        MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
+                        bufferSize);
+
+        Pair<CompletionStage<Done>, CompletionStage<MqttMessage>> result =
+                source.toMat(Sink.head(), Keep.both()).run(system);
+
+        result.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+        List<MqttUserProperty> userPropsToSend = Arrays.asList(
+                MqttUserProperty.create("x-trace-id", "abc123"),
+                MqttUserProperty.create("x-tenant", "acme"));
+        MqttMessage msg = MqttMessage.create(topic, 
ByteString.fromString("test"))
+                .withUserProperties(userPropsToSend);
+        Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+                MqttSink.create(
+                        
connectionSettings.withClientId("source-test/user-props-sink"),
+                        MqttQoS.atLeastOnce());
+        Source.single(msg).runWith(mqttSink, system);
+
+        MqttMessage received = result.second().toCompletableFuture().get(5, 
TimeUnit.SECONDS);
+        List<MqttUserProperty> userProps = received.getUserProperties();
+        assertEquals(2, userProps.size());
+        assertEquals("x-trace-id", userProps.get(0).getKey());
+        assertEquals("abc123", userProps.get(0).getValue());
+        assertEquals("x-tenant", userProps.get(1).getKey());
+        assertEquals("acme", userProps.get(1).getValue());
+    }
 }
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala 
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index 5ae7b1b3a..1f685e32d 100644
--- a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -20,6 +20,7 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
+import scala.collection.immutable
 
 import org.apache.pekko.Done
 import org.apache.pekko.NotUsed
@@ -28,6 +29,7 @@ import 
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
 import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
 import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource
@@ -492,4 +494,23 @@ class MqttSourceSpec extends 
MqttSpecBase("MqttSourceSpec") {
       Await.result(proxyKs2, timeout).shutdown()
     }
   }
+
+  "receive user properties from a message" in {
+    val topic = "v5/source-spec/user-props"
+    val expectedProps = immutable.Seq(
+      MqttUserProperty("x-trace-id", "abc123"),
+      MqttUserProperty("x-tenant", "acme"))
+
+    val (subscribed, result) = MqttSource
+      .atMostOnce(sourceSettings, MqttSubscriptions(topic, 
MqttQoS.AtLeastOnce), 8)
+      .toMat(Sink.head)(Keep.both)
+      .run()
+
+    Await.ready(subscribed, timeout)
+
+    val msg = MqttMessage(topic, 
ByteString("test")).withUserProperties(expectedProps)
+    Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+    result.futureValue.userProperties shouldBe expectedProps
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to