This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 30090ecd9 MQTT v5: expose MQTT 5 user properties on MqttMessage
(#1480) (#1483)
30090ecd9 is described below
commit 30090ecd9d29fac606208da990f96ad288f389fa
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 10 11:02:23 2026 +0100
MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480) (#1483)
* MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480)
* Add MqttUserProperty model class (Scala/Java API)
* Add userProperties field to MqttMessage with withUserProperties builder
* Map incoming Paho user properties to MqttUserProperty on messageArrived
* Map outgoing MqttUserProperty to Paho MqttProperties on publish
* Add ACL entries and tests (Scala + Java) for user property round-trip
* fix compile issues in Scala 2.12
* add Java API methods to mqttv5 classes (#1486)
* add Java API methods to mqttv5 classes
* Update model.scala
* Update model.scala
---------
Co-authored-by: anton-curanz-mw
<[email protected]>
---
mqtt/src/test/travis/acl | 2 +
.../connectors/mqttv5/impl/MqttFlowStage.scala | 17 ++++-
.../pekko/stream/connectors/mqttv5/model.scala | 84 +++++++++++++++++++---
.../src/test/java/docs/javadsl/MqttSourceTest.java | 38 ++++++++++
.../test/scala/docs/scaladsl/MqttSourceSpec.scala | 21 ++++++
5 files changed, 152 insertions(+), 10 deletions(-)
diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl
index 8bf95d38d..104dfb743 100644
--- a/mqtt/src/test/travis/acl
+++ b/mqtt/src/test/travis/acl
@@ -23,6 +23,7 @@ topic v5/coffee/level
topic v5/source-spec/will
topic v5/source-spec/manualacks
topic v5/source-spec/pendingacks
+topic v5/source-spec/user-props
topic v5/sink-spec/topic1
topic v5/sink-spec/topic2
topic v5/sink-spec/topic3
@@ -32,6 +33,7 @@ topic v5/source-test/topic2
topic v5/source-test/will
topic v5/source-test/manualacks
topic v5/source-test/pendingacks
+topic v5/source-test/user-props
topic v5/flow-spec/topic-ack
topic v5/flow-test/topic-ack
topic v5/typed-flow-spec/topic1
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
index f60baa4a6..4ebb5f5bc 100644
---
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
@@ -31,11 +31,13 @@ import org.apache.pekko.stream._
import org.apache.pekko.stream.connectors.mqttv5.AuthSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
import org.apache.pekko.stream.stage._
import org.apache.pekko.util.ByteString
+import org.apache.pekko.util.ccompat.JavaConverters._
import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient
import org.eclipse.paho.mqttv5.client.IMqttToken
@@ -46,6 +48,7 @@ import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse
import org.eclipse.paho.mqttv5.common.MqttException
import org.eclipse.paho.mqttv5.common.packet.MqttProperties
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode
+import org.eclipse.paho.mqttv5.common.packet.UserProperty
import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage }
/**
@@ -235,8 +238,14 @@ abstract class MqttFlowStageLogic[I](
new MqttCallback {
override def messageArrived(topic: String, pahoMessage:
PahoMqttMessage): Unit = {
backpressurePahoClient.acquire()
+ val userProps =
+ Option(pahoMessage.getProperties)
+ .map(_.getUserProperties.asScala.map(p =>
MqttUserProperty(p.getKey, p.getValue)).toList)
+ .getOrElse(Nil)
val message = new MqttMessageWithAck {
- override val message: MqttMessage = MqttMessage(topic,
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+ override val message: MqttMessage =
+ MqttMessage(topic,
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+ .withUserProperties(userProps)
override def ack(): Future[Done] = {
val promise = Promise[Done]()
@@ -404,6 +413,12 @@ abstract class MqttFlowStageLogic[I](
pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value)
pahoMsg.setRetained(msg.retained)
+ if (msg.userProperties.nonEmpty) {
+ val pahoProps = new MqttProperties()
+ pahoProps.setUserProperties(msg.userProperties.map(p => new
UserProperty(p.key, p.value)).toList.asJava)
+ pahoMsg.setProperties(pahoProps)
+ }
+
mqttClient.publish(
msg.topic,
pahoMsg,
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
index 106488b16..d04d2c77d 100644
---
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -14,12 +14,46 @@
package org.apache.pekko.stream.connectors.mqttv5
import org.apache.pekko
+import pekko.util.ccompat.JavaConverters._
+import pekko.util.OptionConverters._
+
+import scala.collection.immutable
+
+final class MqttUserProperty private (val key: String, val value: String) {
+
+ /** Java API */
+ def getKey(): String = key
+
+ /** Java API */
+ def getValue(): String = value
+
+ override def toString = s"MqttUserProperty(key=$key,value=$value)"
+
+ override def equals(other: Any): Boolean = other match {
+ case that: MqttUserProperty =>
+ java.util.Objects.equals(this.key, that.key) &&
+ java.util.Objects.equals(this.value, that.value)
+ case _ => false
+ }
+
+ override def hashCode(): Int = java.util.Objects.hash(key, value)
+}
+
+object MqttUserProperty {
+
+ /** Scala API */
+ def apply(key: String, value: String): MqttUserProperty = new
MqttUserProperty(key, value)
+
+ /** Java API */
+ def create(key: String, value: String): MqttUserProperty = new
MqttUserProperty(key, value)
+}
final class MqttMessage private (
val topic: String,
val payload: org.apache.pekko.util.ByteString,
val qos: Option[MqttQoS],
- val retained: Boolean
+ val retained: Boolean,
+ val userProperties: Seq[MqttUserProperty]
) {
def withTopic(value: String): MqttMessage = copy(topic = value)
@@ -28,26 +62,56 @@ final class MqttMessage private (
def withQos(value: MqttQoS): MqttMessage = copy(qos = Option(value))
def withRetained(value: Boolean): MqttMessage = if (retained == value) this
else copy(retained = value)
- private def copy(topic: String = topic,
+ /** Scala API */
+ def withUserProperties(value: immutable.Seq[MqttUserProperty]): MqttMessage =
+ copy(userProperties = value)
+
+ /** Java API */
+ def withUserProperties(value: java.util.List[MqttUserProperty]): MqttMessage
=
+ copy(userProperties = value.asScala.toSeq)
+
+ /**
+ * Java API. Returns the user properties.
+ * Modifying the returned list will not change the user properties of this
message.
+ */
+ def getUserProperties(): java.util.List[MqttUserProperty] =
userProperties.asJava
+
+ /** Java API */
+ def isRetained(): Boolean = retained
+
+ /** Java API */
+ def getQoS(): java.util.Optional[MqttQoS] = qos.toJava
+
+ /** Java API */
+ def getPayload(): org.apache.pekko.util.ByteString = payload
+
+ /** Java API */
+ def getTopic(): String = topic
+
+ private def copy(
+ topic: String = topic,
payload: pekko.util.ByteString = payload,
qos: Option[MqttQoS] = qos,
- retained: Boolean = retained): MqttMessage =
- new MqttMessage(topic = topic, payload = payload, qos = qos, retained =
retained)
+ retained: Boolean = retained,
+ userProperties: Seq[MqttUserProperty] = userProperties): MqttMessage =
+ new MqttMessage(topic = topic, payload = payload, qos = qos, retained =
retained, userProperties = userProperties)
override def toString =
-
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained)"""
+
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained,userProperties=${userProperties.mkString(
+ "[", ", ", "]")})"""
override def equals(other: Any): Boolean = other match {
case that: MqttMessage =>
java.util.Objects.equals(this.topic, that.topic) &&
java.util.Objects.equals(this.payload, that.payload) &&
java.util.Objects.equals(this.qos, that.qos) &&
- java.util.Objects.equals(this.retained, that.retained)
+ java.util.Objects.equals(this.retained, that.retained) &&
+ java.util.Objects.equals(this.userProperties, that.userProperties)
case _ => false
}
override def hashCode(): Int =
- java.util.Objects.hash(topic, payload, qos, Boolean.box(retained))
+ java.util.Objects.hash(topic, payload, qos, Boolean.box(retained),
userProperties)
}
object MqttMessage {
@@ -59,7 +123,8 @@ object MqttMessage {
topic,
payload,
qos = None,
- retained = false)
+ retained = false,
+ userProperties = Seq.empty)
/** Java API */
def create(
@@ -68,5 +133,6 @@ object MqttMessage {
topic,
payload,
qos = None,
- retained = false)
+ retained = false,
+ userProperties = Seq.empty)
}
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index 3d769ba31..57d014fe3 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -23,6 +23,7 @@ import
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings;
import org.apache.pekko.stream.connectors.mqttv5.MqttMessage;
import org.apache.pekko.stream.connectors.mqttv5.MqttQoS;
import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty;
import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck;
import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSink;
import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSource;
@@ -338,4 +339,41 @@ public class MqttSourceTest {
MqttMessage.create(willTopic, ByteString.fromString("ohi")),
elem.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
+
+ @Test
+ public void receiveUserProperties() throws Exception {
+ final String topic = "v5/source-test/user-props";
+ MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create("tcp://localhost:1883",
"test-java-user-props", new MemoryPersistence());
+
+ Source<MqttMessage, CompletionStage<Done>> source =
+ MqttSource.atMostOnce(
+
connectionSettings.withClientId("source-test/user-props-source"),
+ MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
+ bufferSize);
+
+ Pair<CompletionStage<Done>, CompletionStage<MqttMessage>> result =
+ source.toMat(Sink.head(), Keep.both()).run(system);
+
+ result.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ List<MqttUserProperty> userPropsToSend = Arrays.asList(
+ MqttUserProperty.create("x-trace-id", "abc123"),
+ MqttUserProperty.create("x-tenant", "acme"));
+ MqttMessage msg = MqttMessage.create(topic,
ByteString.fromString("test"))
+ .withUserProperties(userPropsToSend);
+ Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+ MqttSink.create(
+
connectionSettings.withClientId("source-test/user-props-sink"),
+ MqttQoS.atLeastOnce());
+ Source.single(msg).runWith(mqttSink, system);
+
+ MqttMessage received = result.second().toCompletableFuture().get(5,
TimeUnit.SECONDS);
+ List<MqttUserProperty> userProps = received.getUserProperties();
+ assertEquals(2, userProps.size());
+ assertEquals("x-trace-id", userProps.get(0).getKey());
+ assertEquals("abc123", userProps.get(0).getValue());
+ assertEquals("x-tenant", userProps.get(1).getKey());
+ assertEquals("acme", userProps.get(1).getValue());
+ }
}
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index 5ae7b1b3a..1f685e32d 100644
--- a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -20,6 +20,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
+import scala.collection.immutable
import org.apache.pekko.Done
import org.apache.pekko.NotUsed
@@ -28,6 +29,7 @@ import
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource
@@ -492,4 +494,23 @@ class MqttSourceSpec extends
MqttSpecBase("MqttSourceSpec") {
Await.result(proxyKs2, timeout).shutdown()
}
}
+
+ "receive user properties from a message" in {
+ val topic = "v5/source-spec/user-props"
+ val expectedProps = immutable.Seq(
+ MqttUserProperty("x-trace-id", "abc123"),
+ MqttUserProperty("x-tenant", "acme"))
+
+ val (subscribed, result) = MqttSource
+ .atMostOnce(sourceSettings, MqttSubscriptions(topic,
MqttQoS.AtLeastOnce), 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+
+ val msg = MqttMessage(topic,
ByteString("test")).withUserProperties(expectedProps)
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ result.futureValue.userProperties shouldBe expectedProps
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]