This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 001093e5e0b [mqtt] Add portable MqttIO Read/Write transforms for batch 
and streaming (revives #32385) (#38493)
001093e5e0b is described below

commit 001093e5e0bdeb0a360b3b0043f4dd2729550b0d
Author: Tobias Kaymak <[email protected]>
AuthorDate: Sat Jun 13 20:55:05 2026 +0200

    [mqtt] Add portable MqttIO Read/Write transforms for batch and streaming 
(revives #32385) (#38493)
    
    * [mqtt] Add SchemaTransform providers for MqttIO Read/Write
    
    Adds MqttReadSchemaTransformProvider and MqttWriteSchemaTransformProvider
    so MqttIO can be used through the portable SchemaTransform API and exposed
    as cross-language transforms. Decorates MqttIO.ConnectionConfiguration with
    @DefaultSchema(AutoValueSchema.class) and @SchemaFieldDescription so the
    config round-trips through Beam Schemas.
    
    Both batch and streaming are supported on the read side: omitting
    maxNumRecords/maxReadTimeSeconds yields an unbounded (streaming) read,
    while setting either bounds it to a batch read. The provider descriptions
    document this and note that streaming requires a portable streaming runner
    (e.g. Prism, Flink, Dataflow); the legacy local Python DirectRunner does
    not execute portable streaming cross-language reads.
    
    Tests cover read-with-timeout-no-data, an unbounded streaming read
    (publish/collect/cancel), and a write-then-read round trip against an
    embedded ActiveMQ broker.
    
    Revives the approved diff from PR #32385 (ahmedabu98, twosom) and adapts
    it to the post-#32668 generic API (MqttIO.Read<T> / MqttIO.Write<T>).
    
    * [mqtt] Add messaging expansion service and wire MqttIO into Python xlang
    
    Adds a new :sdks:java:io:messaging-expansion-service module that serves
    messaging IOs (MQTT for now, with room for JMS/Solace later) instead of
    adding MqttIO to the shared :sdks:java:io:expansion-service, per review
    feedback from @Abacn and @chamikaramj.
    
    Registers MqttIO's SchemaTransforms in standard_expansion_services.yaml
    under the new service with kafka-style names (ReadFromMqtt / WriteToMqtt),
    skipping the core SchemaTransforms it bundles transitively (those are
    generated from the Java IO expansion service). Regenerates
    standard_external_transforms.yaml so the generated Python wrappers are
    served by the messaging expansion service, and registers the new target in
    the generateExternalTransformsConfig task and the xlang wrapper-validation
    list.
    
    The CHANGES.md announcement is deferred to the follow-up PR that sets up
    the Xlang Messaging PostCommit, per review feedback.
    
    * [expansion-service] Remove obsolete upToDateWhen workaround
    
    outputs.upToDateWhen { false } in the shadowJar block was a workaround for
    a corrupted gradle cache and is no longer needed (review feedback on
    PR #38493).
---
 sdks/java/io/expansion-service/build.gradle        |   1 -
 .../io/messaging-expansion-service/build.gradle    |  52 +++++
 .../java/org/apache/beam/sdk/io/mqtt/MqttIO.java   |   7 +
 .../io/mqtt/MqttReadSchemaTransformProvider.java   | 150 ++++++++++++
 .../io/mqtt/MqttWriteSchemaTransformProvider.java  | 132 +++++++++++
 .../io/mqtt/MqttSchemaTransformProviderTest.java   | 252 +++++++++++++++++++++
 sdks/python/build.gradle                           |   1 +
 sdks/python/test-suites/xlang/build.gradle         |   3 +-
 sdks/standard_expansion_services.yaml              |  15 ++
 sdks/standard_external_transforms.yaml             |  57 ++++-
 settings.gradle.kts                                |   1 +
 11 files changed, 668 insertions(+), 3 deletions(-)

diff --git a/sdks/java/io/expansion-service/build.gradle 
b/sdks/java/io/expansion-service/build.gradle
index 60ef89ed223..70a3fce538b 100644
--- a/sdks/java/io/expansion-service/build.gradle
+++ b/sdks/java/io/expansion-service/build.gradle
@@ -72,7 +72,6 @@ shadowJar {
     attributes(["Multi-Release": true])
   }
   mergeServiceFiles()
-  outputs.upToDateWhen { false }
 }
 
 description = "Apache Beam :: SDKs :: Java :: IO :: Expansion Service"
diff --git a/sdks/java/io/messaging-expansion-service/build.gradle 
b/sdks/java/io/messaging-expansion-service/build.gradle
new file mode 100644
index 00000000000..6cdf67b86d6
--- /dev/null
+++ b/sdks/java/io/messaging-expansion-service/build.gradle
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(
+  automaticModuleName: 'org.apache.beam.sdk.io.messaging.expansion.service',
+  exportJavadoc: false,
+  validateShadowJar: false,
+  shadowClosure: {},
+)
+
+shadowJar {
+  manifest {
+    attributes(["Multi-Release": true])
+  }
+  mergeServiceFiles()
+}
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Messaging Expansion 
Service"
+ext.summary = "Expansion service serving messaging IOs (e.g. MQTT)"
+
+dependencies {
+  implementation project(":sdks:java:expansion-service")
+  permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
+  implementation project(":sdks:java:io:mqtt")
+  permitUnusedDeclared project(":sdks:java:io:mqtt") // BEAM-11761
+  runtimeOnly library.java.slf4j_jdk14
+}
+
+task runExpansionService (type: JavaExec) {
+  mainClass = "org.apache.beam.sdk.expansion.service.ExpansionService"
+  classpath = sourceSets.test.runtimeClasspath
+  args = [project.findProperty("constructionService.port") ?: "8097"]
+}
diff --git 
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 78876eb6534..72449c0697a 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -37,7 +37,10 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -205,13 +208,17 @@ public class MqttIO {
   private MqttIO() {}
 
   /** A POJO describing a MQTT connection. */
+  @DefaultSchema(AutoValueSchema.class)
   @AutoValue
   public abstract static class ConnectionConfiguration implements Serializable 
{
 
+    @SchemaFieldDescription("The MQTT broker URI.")
     abstract String getServerUri();
 
+    @SchemaFieldDescription("The MQTT topic pattern.")
     abstract @Nullable String getTopic();
 
+    @SchemaFieldDescription("The client ID prefix, which is used to construct 
a unique client ID.")
     abstract @Nullable String getClientId();
 
     abstract @Nullable String getUsername();
diff --git 
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java
 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..b83d9bba9f4
--- /dev/null
+++ 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
+import static 
org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.joda.time.Duration;
+
+@AutoService(SchemaTransformProvider.class)
+public class MqttReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<ReadConfiguration> {
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class ReadConfiguration implements Serializable {
+    public static Builder builder() {
+      return new 
AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder();
+    }
+
+    @SchemaFieldDescription("Configuration options to set up the MQTT 
connection.")
+    public abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @SchemaFieldDescription(
+        "The max number of records to receive. Setting this will result in a 
bounded PCollection.")
+    @Nullable
+    public abstract Long getMaxNumRecords();
+
+    @SchemaFieldDescription(
+        "The maximum time for this source to read messages. Setting this will 
result in a bounded PCollection.")
+    @Nullable
+    public abstract Long getMaxReadTimeSeconds();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setConnectionConfiguration(
+          ConnectionConfiguration connectionConfiguration);
+
+      public abstract Builder setMaxNumRecords(Long maxNumRecords);
+
+      public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds);
+
+      public abstract ReadConfiguration build();
+    }
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:mqtt_read:v1";
+  }
+
+  @Override
+  public String description() {
+    return "Reads messages from an MQTT broker and outputs each payload as a 
single `bytes` "
+        + "field.\n"
+        + "\n"
+        + "By default the read is unbounded (streaming): it keeps consuming 
messages from the "
+        + "subscribed topic until the pipeline is stopped. Setting 
`maxNumRecords` and/or "
+        + "`maxReadTimeSeconds` bounds the read, producing a bounded (batch) 
PCollection.\n"
+        + "\n"
+        + "Note: streaming reads require a runner that supports portable 
streaming (e.g. Prism, "
+        + "Flink, or Dataflow). The legacy local Python DirectRunner does not 
execute portable "
+        + "streaming cross-language reads.";
+  }
+
+  @Override
+  protected SchemaTransform from(ReadConfiguration configuration) {
+    return new MqttReadSchemaTransform(configuration);
+  }
+
+  private static class MqttReadSchemaTransform extends SchemaTransform {
+    private final ReadConfiguration config;
+
+    MqttReadSchemaTransform(ReadConfiguration configuration) {
+      this.config = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      Preconditions.checkState(
+          input.getAll().isEmpty(),
+          "Expected zero input PCollections for this source, but found: %s",
+          input.getAll().keySet());
+
+      MqttIO.Read<byte[]> readTransform =
+          
MqttIO.read().withConnectionConfiguration(config.getConnectionConfiguration());
+
+      Long maxRecords = config.getMaxNumRecords();
+      Long maxReadTime = config.getMaxReadTimeSeconds();
+      if (maxRecords != null) {
+        readTransform = readTransform.withMaxNumRecords(maxRecords);
+      }
+      if (maxReadTime != null) {
+        readTransform = 
readTransform.withMaxReadTime(Duration.standardSeconds(maxReadTime));
+      }
+
+      Schema outputSchema = 
Schema.builder().addByteArrayField("bytes").build();
+
+      PCollection<Row> outputRows =
+          input
+              .getPipeline()
+              .apply(readTransform)
+              .apply(
+                  "Wrap in Beam Rows",
+                  ParDo.of(
+                      new DoFn<byte[], Row>() {
+                        @ProcessElement
+                        public void processElement(
+                            @Element byte[] data, OutputReceiver<Row> 
outputReceiver) {
+                          outputReceiver.output(
+                              
Row.withSchema(outputSchema).addValue(data).build());
+                        }
+                      }))
+              .setRowSchema(outputSchema);
+
+      return PCollectionRowTuple.of("output", outputRows);
+    }
+  }
+}
diff --git 
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java
 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..95ee00c8c3a
--- /dev/null
+++ 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration;
+import static 
org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+@AutoService(SchemaTransformProvider.class)
+public class MqttWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<WriteConfiguration> {
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class WriteConfiguration implements Serializable {
+    public static Builder builder() {
+      return new 
AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder();
+    }
+
+    @SchemaFieldDescription("Configuration options to set up the MQTT 
connection.")
+    public abstract ConnectionConfiguration getConnectionConfiguration();
+
+    @SchemaFieldDescription(
+        "Whether or not the publish message should be retained by the 
messaging engine. "
+            + "When a subscriber connects, it gets the latest retained 
message. "
+            + "Defaults to `False`, which will clear the retained message from 
the server.")
+    @Nullable
+    public abstract Boolean getRetained();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setConnectionConfiguration(
+          ConnectionConfiguration connectionConfiguration);
+
+      public abstract Builder setRetained(Boolean retained);
+
+      public abstract WriteConfiguration build();
+    }
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:mqtt_write:v1";
+  }
+
+  @Override
+  public String description() {
+    return "Publishes messages to an MQTT broker. Expects an input PCollection 
of rows with a "
+        + "single `bytes` field, each of which is published as one MQTT 
message.\n"
+        + "\n"
+        + "Works with both bounded (batch) and unbounded (streaming) input 
PCollections.";
+  }
+
+  @Override
+  protected SchemaTransform from(WriteConfiguration configuration) {
+    return new MqttWriteSchemaTransform(configuration);
+  }
+
+  private static class MqttWriteSchemaTransform extends SchemaTransform {
+    private final WriteConfiguration config;
+
+    MqttWriteSchemaTransform(WriteConfiguration configuration) {
+      this.config = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PCollection<Row> inputRows = input.getSinglePCollection();
+
+      Preconditions.checkState(
+          inputRows.getSchema().getFieldCount() == 1
+              && 
inputRows.getSchema().getField(0).getType().equals(Schema.FieldType.BYTES),
+          "Expected only one Schema field containing bytes, but instead 
received: %s",
+          inputRows.getSchema());
+
+      MqttIO.Write<byte[]> writeTransform =
+          
MqttIO.write().withConnectionConfiguration(config.getConnectionConfiguration());
+      Boolean retained = config.getRetained();
+      if (retained != null) {
+        writeTransform = writeTransform.withRetained(retained);
+      }
+
+      inputRows
+          .apply(
+              "Extract bytes",
+              ParDo.of(
+                  new DoFn<Row, byte[]>() {
+                    @ProcessElement
+                    public void processElement(
+                        @Element Row row, OutputReceiver<byte[]> 
outputReceiver) {
+                      outputReceiver.output(
+                          
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
+                              row.getBytes(0)));
+                    }
+                  }))
+          .apply(writeTransform);
+
+      return PCollectionRowTuple.empty(inputRows.getPipeline());
+    }
+  }
+}
diff --git 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java
 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..60bdd1104db
--- /dev/null
+++ 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.common.NetworkTestHelper;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttSchemaTransformProviderTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MqttSchemaTransformProviderTest.class);
+
+  private BrokerService brokerService;
+
+  private int port;
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void startBroker() throws Exception {
+    port = NetworkTestHelper.getAvailableLocalPort();
+    LOG.info("Starting ActiveMQ brokerService on {}", port);
+    brokerService = new BrokerService();
+    brokerService.setDeleteAllMessagesOnStartup(true);
+    // use memory persistence for the test: it's faster and don't pollute test 
folder with KahaDB
+    brokerService.setPersistent(false);
+    brokerService.addConnector("mqtt://localhost:" + port);
+    brokerService.start();
+    brokerService.waitUntilStarted();
+  }
+
+  @Test(timeout = 30 * 1000)
+  public void testReceiveWithTimeoutAndNoData() {
+    MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration =
+        MqttReadSchemaTransformProvider.ReadConfiguration.builder()
+            .setConnectionConfiguration(
+                MqttIO.ConnectionConfiguration.create("tcp://localhost:" + 
port, "READ_TOPIC")
+                    .withClientId("READ_PIPELINE"))
+            .setMaxReadTimeSeconds(2L)
+            .build();
+
+    PCollectionRowTuple.empty(pipeline)
+        .apply(new MqttReadSchemaTransformProvider().from(readConfiguration));
+
+    // should stop before the test timeout
+    pipeline.run().waitUntilFinish();
+  }
+
+  /** Collects the bytes field of every output row into a shared queue 
(DirectRunner is in-JVM). */
+  private static final ConcurrentLinkedQueue<String> STREAMING_RECEIVED =
+      new ConcurrentLinkedQueue<>();
+
+  private static class CollectBytesFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      byte[] bytes = row.getBytes("bytes");
+      if (bytes != null) {
+        STREAMING_RECEIVED.add(new String(bytes, StandardCharsets.UTF_8));
+      }
+    }
+  }
+
+  /**
+   * Reads in streaming mode: when neither {@code maxNumRecords} nor {@code 
maxReadTimeSeconds} is
+   * set the SchemaTransform produces an unbounded PCollection. Verifies that 
messages published
+   * after the reader subscribes flow through continuously, then cancels the 
running pipeline.
+   */
+  @Test(timeout = 90 * 1000)
+  public void testReadUnboundedStreaming() throws Exception {
+    STREAMING_RECEIVED.clear();
+    final String topicName = "STREAM_READ_TOPIC";
+
+    // No bound -> unbounded (streaming) read.
+    MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration =
+        MqttReadSchemaTransformProvider.ReadConfiguration.builder()
+            .setConnectionConfiguration(
+                MqttIO.ConnectionConfiguration.create("tcp://localhost:" + 
port, topicName)
+                    .withClientId("STREAM_READ_PIPELINE"))
+            .build();
+
+    // Use a local pipeline so run() does not block (the read never terminates 
on its own).
+    PipelineOptions options =
+        
PipelineOptionsFactory.fromArgs("--blockOnRun=false").withoutStrictParsing().create();
+    Pipeline p = Pipeline.create(options);
+    PCollectionRowTuple.empty(p)
+        .apply(new MqttReadSchemaTransformProvider().from(readConfiguration))
+        .get("output")
+        .apply(ParDo.of(new CollectBytesFn()));
+
+    // Publish a steady stream of messages until the reader has consumed 
enough.
+    final boolean[] keepPublishing = {true};
+    MQTT client = new MQTT();
+    client.setHost("tcp://localhost:" + port);
+    final BlockingConnection publishConnection = client.blockingConnection();
+    publishConnection.connect();
+    Thread publisher =
+        new Thread(
+            () -> {
+              int i = 0;
+              try {
+                while (keepPublishing[0]) {
+                  publishConnection.publish(
+                      topicName,
+                      ("stream-" + i).getBytes(StandardCharsets.UTF_8),
+                      QoS.AT_LEAST_ONCE,
+                      false);
+                  i++;
+                  Thread.sleep(200);
+                }
+              } catch (Exception e) {
+                // ignore: connection closed on teardown
+              }
+            });
+
+    PipelineResult result = p.run();
+    publisher.start();
+
+    // Wait until the unbounded read delivers a meaningful number of records.
+    int expected = 10;
+    long deadline = System.currentTimeMillis() + 60 * 1000;
+    while (STREAMING_RECEIVED.size() < expected && System.currentTimeMillis() 
< deadline) {
+      Thread.sleep(500);
+    }
+
+    keepPublishing[0] = false;
+    publisher.join();
+    publishConnection.disconnect();
+    result.cancel();
+
+    assertTrue(
+        "expected at least " + expected + " streamed records, got " + 
STREAMING_RECEIVED.size(),
+        STREAMING_RECEIVED.size() >= expected);
+    for (String received : STREAMING_RECEIVED) {
+      assertTrue("unexpected payload: " + received, 
received.startsWith("stream-"));
+    }
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    final int numberOfTestMessages = 200;
+    MQTT client = new MQTT();
+    client.setHost("tcp://localhost:" + port);
+    final BlockingConnection connection = client.blockingConnection();
+    connection.connect();
+    connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"), 
QoS.EXACTLY_ONCE)});
+
+    final Set<String> messages = new ConcurrentSkipListSet<>();
+
+    Thread subscriber =
+        new Thread(
+            () -> {
+              try {
+                for (int i = 0; i < numberOfTestMessages; i++) {
+                  Message message = connection.receive();
+                  messages.add(new String(message.getPayload(), 
StandardCharsets.UTF_8));
+                  message.ack();
+                  LOG.info("message: {}", new String(message.getPayload(), 
StandardCharsets.UTF_8));
+                }
+              } catch (Exception e) {
+                LOG.error("Can't receive message", e);
+              }
+            });
+    subscriber.start();
+
+    ArrayList<byte[]> data = new ArrayList<>();
+    for (int i = 0; i < numberOfTestMessages; i++) {
+      data.add(("Test " + i).getBytes(StandardCharsets.UTF_8));
+    }
+
+    MqttWriteSchemaTransformProvider.WriteConfiguration writeConfiguration =
+        MqttWriteSchemaTransformProvider.WriteConfiguration.builder()
+            .setConnectionConfiguration(
+                MqttIO.ConnectionConfiguration.create("tcp://localhost:" + 
port, "WRITE_TOPIC"))
+            .build();
+    Schema dataSchema = Schema.builder().addByteArrayField("bytes").build();
+
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(Create.of(data))
+            .apply(
+                MapElements.into(TypeDescriptors.rows())
+                    .via(d -> Row.withSchema(dataSchema).addValue(d).build()))
+            .setRowSchema(dataSchema);
+    PCollectionRowTuple.of("input", inputRows)
+        .apply(new 
MqttWriteSchemaTransformProvider().from(writeConfiguration));
+    pipeline.run();
+    subscriber.join();
+
+    connection.disconnect();
+
+    assertEquals(numberOfTestMessages, messages.size());
+    for (int i = 0; i < numberOfTestMessages; i++) {
+      assertTrue(messages.contains("Test " + i));
+    }
+  }
+
+  @After
+  public void stopBroker() throws Exception {
+    if (brokerService != null) {
+      brokerService.stop();
+      brokerService.waitUntilStopped();
+      brokerService = null;
+    }
+  }
+}
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index b39b12f198e..9e2fe232c42 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -73,6 +73,7 @@ tasks.register("generateExternalTransformsConfig") {
   // Need to build all expansion services listed in 
sdks/standard_expansion_services.yaml
   dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
   dependsOn ":sdks:java:io:expansion-service:build"
+  dependsOn ":sdks:java:io:messaging-expansion-service:build"
   // Keep this in-sync with pyproject.toml
   def PyYaml = "'pyyaml>=3.12,<7.0.0'"
 
diff --git a/sdks/python/test-suites/xlang/build.gradle 
b/sdks/python/test-suites/xlang/build.gradle
index 3065ad8377e..1cbbaa0db53 100644
--- a/sdks/python/test-suites/xlang/build.gradle
+++ b/sdks/python/test-suites/xlang/build.gradle
@@ -25,6 +25,7 @@ project.evaluationDependsOn(":sdks:python")
 // relevant fields as done here, then add it to `xlangTasks`.
 def gcpExpansionPath = 
project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath()
 def ioExpansionPath = 
project.project(':sdks:java:io:expansion-service').getPath()
+def messagingExpansionPath = 
project.project(':sdks:java:io:messaging-expansion-service').getPath()
 // Properties that are common across runners.
 // Used to launch the expansion service, collect the right tests, and cleanup 
afterwards
 def gcpXlang = new CrossLanguageTask().tap {
@@ -42,7 +43,7 @@ def ioXlang = new CrossLanguageTask().tap {
 }
 
 // This list should include all expansion service targets in 
sdks/python/standard_expansion_services.yaml
-def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath]
+def servicesToGenerateFrom = [ioExpansionPath, messagingExpansionPath, 
gcpExpansionPath]
 def xlangWrapperValidation = new CrossLanguageTask().tap {
     name = "xlangWrapperValidation"
     expansionProjectPaths = servicesToGenerateFrom
diff --git a/sdks/standard_expansion_services.yaml 
b/sdks/standard_expansion_services.yaml
index 531caca5a37..79c7e06280d 100644
--- a/sdks/standard_expansion_services.yaml
+++ b/sdks/standard_expansion_services.yaml
@@ -53,6 +53,21 @@
     - 'beam:schematransform:org.apache.beam:iceberg_read:v1'
     - 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
 
+- gradle_target: 'sdks:java:io:messaging-expansion-service:shadowJar'
+  destinations:
+    python: 'apache_beam/io'
+  transforms:
+    'beam:schematransform:org.apache.beam:mqtt_write:v1':
+      name: 'WriteToMqtt'
+    'beam:schematransform:org.apache.beam:mqtt_read:v1':
+      name: 'ReadFromMqtt'
+  skip_transforms:
+    # Core SchemaTransforms bundled via :sdks:java:expansion-service; already
+    # generated from the Java IO expansion service above.
+    - 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+    - 'beam:schematransform:org.apache.beam:tfrecord_read:v1'
+    - 'beam:schematransform:org.apache.beam:tfrecord_write:v1'
+
 # TODO(ahmedabu98): Enable this service in a future PR
 #- gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
 #  destinations:
diff --git a/sdks/standard_external_transforms.yaml 
b/sdks/standard_external_transforms.yaml
index b50402a64d5..b9802f11b6c 100644
--- a/sdks/standard_external_transforms.yaml
+++ b/sdks/standard_external_transforms.yaml
@@ -19,7 +19,7 @@
 # configuration in /sdks/standard_expansion_services.yaml.
 # Refer to gen_xlang_wrappers.py for more info.
 #
-# Last updated on: 2026-05-06
+# Last updated on: 2026-06-11
 
 - default_service: sdks:java:io:expansion-service:shadowJar
   description: ''
@@ -180,3 +180,58 @@
     type: str
   identifier: beam:schematransform:org.apache.beam:tfrecord_write:v1
   name: TfrecordWrite
+- default_service: sdks:java:io:messaging-expansion-service:shadowJar
+  description: 'Reads messages from an MQTT broker and outputs each payload as 
a single
+    `bytes` field.
+
+
+    By default the read is unbounded (streaming): it keeps consuming messages 
from
+    the subscribed topic until the pipeline is stopped. Setting 
`maxNumRecords` and/or
+    `maxReadTimeSeconds` bounds the read, producing a bounded (batch) 
PCollection.
+
+
+    Note: streaming reads require a runner that supports portable streaming 
(e.g.
+    Prism, Flink, or Dataflow). The legacy local Python DirectRunner does not 
execute
+    portable streaming cross-language reads.'
+  destinations:
+    python: apache_beam/io
+  fields:
+  - description: Configuration options to set up the MQTT connection.
+    name: connection_configuration
+    nullable: false
+    type: Row(client_id=typing.Optional[str], password=typing.Optional[str], 
server_uri=<class
+      'str'>, topic=typing.Optional[str], username=typing.Optional[str])
+  - description: The max number of records to receive. Setting this will 
result in
+      a bounded PCollection.
+    name: max_num_records
+    nullable: true
+    type: int64
+  - description: The maximum time for this source to read messages. Setting 
this will
+      result in a bounded PCollection.
+    name: max_read_time_seconds
+    nullable: true
+    type: int64
+  identifier: beam:schematransform:org.apache.beam:mqtt_read:v1
+  name: ReadFromMqtt
+- default_service: sdks:java:io:messaging-expansion-service:shadowJar
+  description: 'Publishes messages to an MQTT broker. Expects an input 
PCollection
+    of rows with a single `bytes` field, each of which is published as one 
MQTT message.
+
+
+    Works with both bounded (batch) and unbounded (streaming) input 
PCollections.'
+  destinations:
+    python: apache_beam/io
+  fields:
+  - description: Configuration options to set up the MQTT connection.
+    name: connection_configuration
+    nullable: false
+    type: Row(client_id=typing.Optional[str], password=typing.Optional[str], 
server_uri=<class
+      'str'>, topic=typing.Optional[str], username=typing.Optional[str])
+  - description: Whether or not the publish message should be retained by the 
messaging
+      engine. When a subscriber connects, it gets the latest retained message. 
Defaults
+      to `False`, which will clear the retained message from the server.
+    name: retained
+    nullable: true
+    type: boolean
+  identifier: beam:schematransform:org.apache.beam:mqtt_write:v1
+  name: WriteToMqtt
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 443d9c56775..3d4346661a4 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -236,6 +236,7 @@ 
include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-8")
 include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-9")
 include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common")
 include(":sdks:java:io:expansion-service")
+include(":sdks:java:io:messaging-expansion-service")
 include(":sdks:java:io:file-based-io-tests")
 include(":sdks:java:io:bigquery-io-perf-tests")
 include(":sdks:java:io:cdap")


Reply via email to