tkaymak opened a new pull request, #38493: URL: https://github.com/apache/beam/pull/38493
## What Revives the approved diff from [PR #32385](https://github.com/apache/beam/pull/32385) (`Add portable Mqtt source and sink transforms`) and wires the new SchemaTransforms into Python cross-language wrapper generation. After this lands, Python users can do: ```python from apache_beam.io import ReadFromMqtt, WriteToMqtt ``` and reach `MqttIO` over xlang, the same way `ReadFromKafka` / `WriteToKafka` work today. ## How Two commits: 1. **`[mqtt] Add SchemaTransform providers for MqttIO Read/Write`** - Decorate `MqttIO.ConnectionConfiguration` with `@DefaultSchema(AutoValueSchema.class)` + `@SchemaFieldDescription` so it round-trips through Beam Schemas. - New `MqttReadSchemaTransformProvider` (`beam:schematransform:org.apache.beam:mqtt_read:v1`) and `MqttWriteSchemaTransformProvider` (`beam:schematransform:org.apache.beam:mqtt_write:v1`), both `@AutoService`-registered. - New `MqttSchemaTransformProviderTest` covering the read-with-timeout-no-data case and a write-then-read round trip via an embedded ActiveMQ broker. - Pull `:sdks:java:io:mqtt` into `:sdks:java:io:expansion-service` so the providers are discoverable by `ExpansionService`. 2. **`[mqtt] Wire MqttIO into Python xlang wrapper generation`** - Add name overrides in `sdks/standard_expansion_services.yaml` so the generated wrappers use kafka-style naming (`ReadFromMqtt` / `WriteToMqtt`). - Regenerate `sdks/standard_external_transforms.yaml` via `:sdks:python:generateExternalTransformsConfig`. - Add an I/Os entry to `CHANGES.md` for 2.74.0. ## Notes vs. PR #32385 - **Generic-typing fix** for the post-#32668 API: `MqttIO.Read<byte[]>` / `MqttIO.Write<byte[]>` instead of the raw types in the original PR. - **Naming**: `ReadFromMqtt` / `WriteToMqtt` (kafka-style) instead of the auto-derived `MqttRead` / `MqttWrite`. Per @Abacn's roadmap [comment](https://github.com/apache/beam/pull/32385#issuecomment-4431628903) about onboarding through `standard_expansion_services.yaml`. - The original PR's regenerated `standard_external_transforms.yaml` shape changed slightly (`fields` is now a list and Python types have a more compact representation) because `gen_xlang_wrappers.py` evolved since 2024-08. Our regenerated diff follows current master's format. - `topic` is now `Optional` in the generated schema because `MqttIO.ConnectionConfiguration#getTopic()` was made `@Nullable` by [PR #32668](https://github.com/apache/beam/pull/32668) (`readWithMetadata`). ## Scope **Batch only.** The streaming-mode failure that @twosom flagged on the original PR ([comment](https://github.com/apache/beam/pull/32385#issuecomment-2468417402) — *batch worked, streaming did not*) was never root-caused. That investigation is intentionally out of scope here and will be addressed in a dedicated follow-up PR. ## Credits Original work by @ahmedabu98 and @twosom on PR #32385; @damondouglas approved that PR before it went stale and auto-closed on 2025-10-14. This revives that change with the small adjustments above. ## Verification ```bash GRADLE_USER_HOME=/tmp/.gradle ./gradlew \ :sdks:java:io:mqtt:test \ :sdks:java:io:expansion-service:build \ :validateChanges ``` All pass locally. R: @Abacn Closes the gap from #32385 / addresses #21060 (Python MQTT IO). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
