tkaymak opened a new pull request, #38493:
URL: https://github.com/apache/beam/pull/38493

   ## What
   
   Revives the approved diff from [PR 
#32385](https://github.com/apache/beam/pull/32385) (`Add portable Mqtt source 
and sink transforms`) and wires the new SchemaTransforms into Python 
cross-language wrapper generation.
   
   After this lands, Python users can do:
   
   ```python
   from apache_beam.io import ReadFromMqtt, WriteToMqtt
   ```
   
   and reach `MqttIO` over xlang, the same way `ReadFromKafka` / `WriteToKafka` 
work today.
   
   ## How
   
   Two commits:
   
   1. **`[mqtt] Add SchemaTransform providers for MqttIO Read/Write`**
      - Decorate `MqttIO.ConnectionConfiguration` with 
`@DefaultSchema(AutoValueSchema.class)` + `@SchemaFieldDescription` so it 
round-trips through Beam Schemas.
      - New `MqttReadSchemaTransformProvider` 
(`beam:schematransform:org.apache.beam:mqtt_read:v1`) and 
`MqttWriteSchemaTransformProvider` 
(`beam:schematransform:org.apache.beam:mqtt_write:v1`), both 
`@AutoService`-registered.
      - New `MqttSchemaTransformProviderTest` covering the 
read-with-timeout-no-data case and a write-then-read round trip via an embedded 
ActiveMQ broker.
      - Pull `:sdks:java:io:mqtt` into `:sdks:java:io:expansion-service` so the 
providers are discoverable by `ExpansionService`.
   
   2. **`[mqtt] Wire MqttIO into Python xlang wrapper generation`**
      - Add name overrides in `sdks/standard_expansion_services.yaml` so the 
generated wrappers use kafka-style naming (`ReadFromMqtt` / `WriteToMqtt`).
      - Regenerate `sdks/standard_external_transforms.yaml` via 
`:sdks:python:generateExternalTransformsConfig`.
      - Add an I/Os entry to `CHANGES.md` for 2.74.0.
   
   ## Notes vs. PR #32385
   
   - **Generic-typing fix** for the post-#32668 API: `MqttIO.Read<byte[]>` / 
`MqttIO.Write<byte[]>` instead of the raw types in the original PR.
   - **Naming**: `ReadFromMqtt` / `WriteToMqtt` (kafka-style) instead of the 
auto-derived `MqttRead` / `MqttWrite`. Per @Abacn's roadmap 
[comment](https://github.com/apache/beam/pull/32385#issuecomment-4431628903) 
about onboarding through `standard_expansion_services.yaml`.
   - The original PR's regenerated `standard_external_transforms.yaml` shape 
changed slightly (`fields` is now a list and Python types have a more compact 
representation) because `gen_xlang_wrappers.py` evolved since 2024-08. Our 
regenerated diff follows current master's format.
   - `topic` is now `Optional` in the generated schema because 
`MqttIO.ConnectionConfiguration#getTopic()` was made `@Nullable` by [PR 
#32668](https://github.com/apache/beam/pull/32668) (`readWithMetadata`).
   
   ## Scope
   
   **Batch only.** The streaming-mode failure that @twosom flagged on the 
original PR 
([comment](https://github.com/apache/beam/pull/32385#issuecomment-2468417402) — 
*batch worked, streaming did not*) was never root-caused. That investigation is 
intentionally out of scope here and will be addressed in a dedicated follow-up 
PR.
   
   ## Credits
   
   Original work by @ahmedabu98 and @twosom on PR #32385; @damondouglas 
approved that PR before it went stale and auto-closed on 2025-10-14. This 
revives that change with the small adjustments above.
   
   ## Verification
   
   ```bash
   GRADLE_USER_HOME=/tmp/.gradle ./gradlew \
       :sdks:java:io:mqtt:test \
       :sdks:java:io:expansion-service:build \
       :validateChanges
   ```
   All pass locally.
   
   R: @Abacn
   
   Closes the gap from #32385 / addresses #21060 (Python MQTT IO).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to