junaiddshaukat opened a new issue, #38616:
URL: https://github.com/apache/beam/issues/38616
Tracking issue: #18479
Depends on: #38465 (skeleton, merged via #38534)
## Summary
Second sub-issue under the Kafka Streams Runner GSoC 2026 project. The
skeleton from #38465 currently throws `UnsupportedOperationException`
for every transform URN. This issue adds:
1. A URN-dispatch framework in `KafkaStreamsPipelineTranslator` (a
`Map<String, TransformTranslator>` populated at construction time,
walked via `QueryablePipeline` in topological order — same shape as
`FlinkStreamingPortablePipelineTranslator`).
2. The first concrete translator: **Impulse**
(`beam:transform:impulse:v1`). Per the design doc §4.1, this uses a
dedicated bootstrap topic (e.g. `__beam_impulse`) so Kafka Streams
has a real source to consume from, emits one empty `byte[]` element,
records in a state store that it has already fired, and advances
the watermark to `BoundedWindow.TIMESTAMP_MAX_VALUE`.
3. The translation context starts holding the Kafka Streams `Topology`
being built and the `Map<String, String>` from PCollection ID to
processor node name.
After this issue, an Impulse-only pipeline translates and starts a
Kafka Streams topology. Pipelines containing any other URN still fail
fast with `No translator registered for URN ...` — the message format
unchanged from #38465.
## Design doc reference
[Portable Kafka Streams Runner for Apache Beam — design
doc](https://docs.google.com/document/d/1BBMURhSG4SxPcvvnKMTrmnKCr_jhXL6R4TBDBW7zsy8/edit?usp=sharing)
§4.1, §11.5.
## Scope
- [ ] `KafkaStreamsTranslationContext`: hold `Topology`, plus
`Map<String, String> pcollectionIdToProcessorName` and accessors.
- [ ] `KafkaStreamsPipelineTranslator`: `Map<String, TransformTranslator>`
populated at construction; walk via `QueryablePipeline` /
topological order.
- [ ] `TransformTranslator` interface (single method
`translate(PTransformNode, RunnerApi.Pipeline,
KafkaStreamsTranslationContext)`).
- [ ] `translation/ImpulseTranslator` implementing the dedicated-topic
pattern from design doc §4.1.
- [ ] Bootstrap-topic creation hook (auto-create via `AdminClient` or
require pre-created — open question 12.1; pick one with note).
- [ ] Wire `KafkaStreamsPipelineRunner.run` to actually call
`KafkaStreams.start()` on the built topology (instead of
throwing). Returns a `PortablePipelineResult` that tracks the
`KafkaStreams` instance state.
## Acceptance criteria
- [ ] `./gradlew :runners:kafka-streams:check` green.
- [ ] Unit test using `TopologyTestDriver`: build an Impulse-only
pipeline proto, translate it, run the resulting topology, assert
exactly one empty `byte[]` element appears at the output node
and is not re-emitted on restart.
- [ ] Unit test asserting any non-Impulse URN still throws the same
"No translator registered for URN ..." message from #38465.
- [ ] PreCommit workflow added in #38534 still triggers and passes.
## Out of scope (deferred to follow-up sub-issues)
- ExecutableStage / stateless ParDo (next sub-issue).
- GroupByKey, Combine, Window assignment, Flatten.
- Watermark manager (per Jan: "watermark manager comes last when GBK
forces it").
- Splittable DoFn.
## Reference implementation
-
`runners/flink/2.0/src/main/java/.../FlinkStreamingPortablePipelineTranslator.java`
— URN dispatch map pattern.
-
`runners/flink/2.0/src/main/java/.../translation/.../ImpulseSourceFunction.java`
(and surrounding wiring) for the Flink-side analog. Kafka Streams
needs a different approach because KS requires a real input topic.
cc @je-ik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]