This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 7c12ec698 MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480)
7c12ec698 is described below
commit 7c12ec698c3e0b3b0c8358dda39cb3610521b37b
Author: anton-curanz-mw <[email protected]>
AuthorDate: Fri Mar 6 18:20:46 2026 +0100
MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480)
* Add MqttUserProperty model class (Scala/Java API)
* Add userProperties field to MqttMessage with withUserProperties builder
* Map incoming Paho user properties to MqttUserProperty on messageArrived
* Map outgoing MqttUserProperty to Paho MqttProperties on publish
* Add ACL entries and tests (Scala + Java) for user property round-trip
---
mqtt/src/test/travis/acl | 2 +
.../connectors/mqttv5/impl/MqttFlowStage.scala | 17 ++++++-
.../pekko/stream/connectors/mqttv5/model.scala | 58 ++++++++++++++++++----
.../src/test/java/docs/javadsl/MqttSourceTest.java | 40 +++++++++++++++
.../test/scala/docs/scaladsl/MqttSourceSpec.scala | 20 ++++++++
5 files changed, 127 insertions(+), 10 deletions(-)
diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl
index 8bf95d38d..104dfb743 100644
--- a/mqtt/src/test/travis/acl
+++ b/mqtt/src/test/travis/acl
@@ -23,6 +23,7 @@ topic v5/coffee/level
topic v5/source-spec/will
topic v5/source-spec/manualacks
topic v5/source-spec/pendingacks
+topic v5/source-spec/user-props
topic v5/sink-spec/topic1
topic v5/sink-spec/topic2
topic v5/sink-spec/topic3
@@ -32,6 +33,7 @@ topic v5/source-test/topic2
topic v5/source-test/will
topic v5/source-test/manualacks
topic v5/source-test/pendingacks
+topic v5/source-test/user-props
topic v5/flow-spec/topic-ack
topic v5/flow-test/topic-ack
topic v5/typed-flow-spec/topic1
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
index f60baa4a6..8b50124b6 100644
---
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.Promise
+import scala.jdk.CollectionConverters._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
@@ -31,6 +32,7 @@ import org.apache.pekko.stream._
import org.apache.pekko.stream.connectors.mqttv5.AuthSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
@@ -46,6 +48,7 @@ import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse
import org.eclipse.paho.mqttv5.common.MqttException
import org.eclipse.paho.mqttv5.common.packet.MqttProperties
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode
+import org.eclipse.paho.mqttv5.common.packet.UserProperty
import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage }
/**
@@ -235,8 +238,14 @@ abstract class MqttFlowStageLogic[I](
new MqttCallback {
override def messageArrived(topic: String, pahoMessage:
PahoMqttMessage): Unit = {
backpressurePahoClient.acquire()
+ val userProps: Seq[MqttUserProperty] =
+ Option(pahoMessage.getProperties)
+ .map(_.getUserProperties.asScala.map(p =>
MqttUserProperty(p.getKey, p.getValue)).toList)
+ .getOrElse(Nil)
val message = new MqttMessageWithAck {
- override val message: MqttMessage = MqttMessage(topic,
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+ override val message: MqttMessage =
+ MqttMessage(topic,
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+ .withUserProperties(userProps)
override def ack(): Future[Done] = {
val promise = Promise[Done]()
@@ -404,6 +413,12 @@ abstract class MqttFlowStageLogic[I](
pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value)
pahoMsg.setRetained(msg.retained)
+ if (msg.userProperties.nonEmpty) {
+ val pahoProps = new MqttProperties()
+ pahoProps.setUserProperties(msg.userProperties.map(p => new
UserProperty(p.key, p.value)).toList.asJava)
+ pahoMsg.setProperties(pahoProps)
+ }
+
mqttClient.publish(
msg.topic,
pahoMsg,
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
index 106488b16..6c8d594c0 100644
---
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -15,11 +15,37 @@ package org.apache.pekko.stream.connectors.mqttv5
import org.apache.pekko
+import scala.collection.immutable
+import scala.jdk.CollectionConverters._
+
+final class MqttUserProperty private (val key: String, val value: String) {
+ override def toString = s"MqttUserProperty(key=$key,value=$value)"
+
+ override def equals(other: Any): Boolean = other match {
+ case that: MqttUserProperty =>
+ java.util.Objects.equals(this.key, that.key) &&
+ java.util.Objects.equals(this.value, that.value)
+ case _ => false
+ }
+
+ override def hashCode(): Int = java.util.Objects.hash(key, value)
+}
+
+object MqttUserProperty {
+
+ /** Scala API */
+ def apply(key: String, value: String): MqttUserProperty = new
MqttUserProperty(key, value)
+
+ /** Java API */
+ def create(key: String, value: String): MqttUserProperty = new
MqttUserProperty(key, value)
+}
+
final class MqttMessage private (
val topic: String,
val payload: org.apache.pekko.util.ByteString,
val qos: Option[MqttQoS],
- val retained: Boolean
+ val retained: Boolean,
+ val userProperties: Array[MqttUserProperty]
) {
def withTopic(value: String): MqttMessage = copy(topic = value)
@@ -28,26 +54,38 @@ final class MqttMessage private (
def withQos(value: MqttQoS): MqttMessage = copy(qos = Option(value))
def withRetained(value: Boolean): MqttMessage = if (retained == value) this
else copy(retained = value)
- private def copy(topic: String = topic,
+ /** Scala API */
+ def withUserProperties(value: immutable.Seq[MqttUserProperty]): MqttMessage =
+ copy(userProperties = value.toArray)
+
+ /** Java API */
+ def withUserProperties(value: java.util.List[MqttUserProperty]): MqttMessage
=
+ copy(userProperties = value.asScala.toArray)
+
+ private def copy(
+ topic: String = topic,
payload: pekko.util.ByteString = payload,
qos: Option[MqttQoS] = qos,
- retained: Boolean = retained): MqttMessage =
- new MqttMessage(topic = topic, payload = payload, qos = qos, retained =
retained)
+ retained: Boolean = retained,
+ userProperties: Array[MqttUserProperty] = userProperties): MqttMessage =
+ new MqttMessage(topic = topic, payload = payload, qos = qos, retained =
retained, userProperties = userProperties)
override def toString =
-
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained)"""
+
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained,userProperties=${userProperties.mkString(
+ "[", ", ", "]")})"""
override def equals(other: Any): Boolean = other match {
case that: MqttMessage =>
java.util.Objects.equals(this.topic, that.topic) &&
java.util.Objects.equals(this.payload, that.payload) &&
java.util.Objects.equals(this.qos, that.qos) &&
- java.util.Objects.equals(this.retained, that.retained)
+ java.util.Objects.equals(this.retained, that.retained) &&
+ java.util.Objects.equals(this.userProperties.toSeq,
that.userProperties.toSeq)
case _ => false
}
override def hashCode(): Int =
- java.util.Objects.hash(topic, payload, qos, Boolean.box(retained))
+ java.util.Objects.hash(topic, payload, qos, Boolean.box(retained),
userProperties.toSeq)
}
object MqttMessage {
@@ -59,7 +97,8 @@ object MqttMessage {
topic,
payload,
qos = None,
- retained = false)
+ retained = false,
+ userProperties = Array.empty)
/** Java API */
def create(
@@ -68,5 +107,6 @@ object MqttMessage {
topic,
payload,
qos = None,
- retained = false)
+ retained = false,
+ userProperties = Array.empty)
}
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index 395a88aed..bae3e3771 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -23,6 +23,7 @@ import
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings;
import org.apache.pekko.stream.connectors.mqttv5.MqttMessage;
import org.apache.pekko.stream.connectors.mqttv5.MqttQoS;
import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty;
import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck;
import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSink;
import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSource;
@@ -50,6 +51,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import scala.jdk.javaapi.CollectionConverters;
+
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -338,4 +341,41 @@ public class MqttSourceTest {
MqttMessage.create(willTopic, ByteString.fromString("ohi")),
elem.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
+
+ @Test
+ public void receiveUserProperties() throws Exception {
+ final String topic = "v5/source-test/user-props";
+ MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create("tcp://localhost:1883",
"test-java-user-props", new MemoryPersistence());
+
+ Source<MqttMessage, CompletionStage<Done>> source =
+ MqttSource.atMostOnce(
+
connectionSettings.withClientId("source-test/user-props-source"),
+ MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
+ bufferSize);
+
+ Pair<CompletionStage<Done>, CompletionStage<MqttMessage>> result =
+ source.toMat(Sink.head(), Keep.both()).run(system);
+
+ result.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ List<MqttUserProperty> userPropsToSend = Arrays.asList(
+ MqttUserProperty.create("x-trace-id", "abc123"),
+ MqttUserProperty.create("x-tenant", "acme"));
+ MqttMessage msg = MqttMessage.create(topic,
ByteString.fromString("test"))
+ .withUserProperties(userPropsToSend);
+ Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+ MqttSink.create(
+
connectionSettings.withClientId("source-test/user-props-sink"),
+ MqttQoS.atLeastOnce());
+ Source.single(msg).runWith(mqttSink, system);
+
+ MqttMessage received = result.second().toCompletableFuture().get(5,
TimeUnit.SECONDS);
+ List<MqttUserProperty> userProps =
Arrays.asList(received.userProperties());
+ assertEquals(2, userProps.size());
+ assertEquals("x-trace-id", userProps.get(0).key());
+ assertEquals("abc123", userProps.get(0).value());
+ assertEquals("x-tenant", userProps.get(1).key());
+ assertEquals("acme", userProps.get(1).value());
+ }
}
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index bedd05ab3..7b875d171 100644
--- a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -28,6 +28,7 @@ import
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink
import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource
@@ -492,4 +493,23 @@ class MqttSourceSpec extends
MqttSpecBase("MqttSourceSpec") {
Await.result(proxyKs2, timeout).shutdown()
}
}
+
+ "receive user properties from a message" in {
+ val topic = "v5/source-spec/user-props"
+ val expectedProps = Seq(
+ MqttUserProperty("x-trace-id", "abc123"),
+ MqttUserProperty("x-tenant", "acme"))
+
+ val (subscribed, result) = MqttSource
+ .atMostOnce(sourceSettings, MqttSubscriptions(topic,
MqttQoS.AtLeastOnce), 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+
+ val msg = MqttMessage(topic,
ByteString("test")).withUserProperties(expectedProps)
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ result.futureValue.userProperties shouldBe expectedProps
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]