junaiddshaukat opened a new issue, #38616:
URL: https://github.com/apache/beam/issues/38616

   Tracking issue: #18479
   Depends on: #38465 (skeleton, merged via #38534)
   
   ## Summary
   
   Second sub-issue under the Kafka Streams Runner GSoC 2026 project. The
   skeleton from #38465 currently throws `UnsupportedOperationException`
   for every transform URN. This issue adds:
   
   1. A URN-dispatch framework in `KafkaStreamsPipelineTranslator` (a
      `Map<String, TransformTranslator>` populated at construction time,
      walked via `QueryablePipeline` in topological order — same shape as
      `FlinkStreamingPortablePipelineTranslator`).
   2. The first concrete translator: **Impulse**
      (`beam:transform:impulse:v1`). Per the design doc §4.1, this uses a
      dedicated bootstrap topic (e.g. `__beam_impulse`) so Kafka Streams
      has a real source to consume from, emits one empty `byte[]` element,
      records in a state store that it has already fired, and advances
      the watermark to `BoundedWindow.TIMESTAMP_MAX_VALUE`.
   3. The translation context starts holding the Kafka Streams `Topology`
      being built and the `Map<String, String>` from PCollection ID to
      processor node name.
   
   After this issue, an Impulse-only pipeline translates and starts a
   Kafka Streams topology. Pipelines containing any other URN still fail
   fast with `No translator registered for URN ...` — the message format
   unchanged from #38465.
   
   ## Design doc reference
   
   [Portable Kafka Streams Runner for Apache Beam — design 
doc](https://docs.google.com/document/d/1BBMURhSG4SxPcvvnKMTrmnKCr_jhXL6R4TBDBW7zsy8/edit?usp=sharing)
 §4.1, §11.5.
   
   ## Scope
   
   - [ ] `KafkaStreamsTranslationContext`: hold `Topology`, plus
         `Map<String, String> pcollectionIdToProcessorName` and accessors.
   - [ ] `KafkaStreamsPipelineTranslator`: `Map<String, TransformTranslator>`
         populated at construction; walk via `QueryablePipeline` /
         topological order.
   - [ ] `TransformTranslator` interface (single method
         `translate(PTransformNode, RunnerApi.Pipeline, 
KafkaStreamsTranslationContext)`).
   - [ ] `translation/ImpulseTranslator` implementing the dedicated-topic
         pattern from design doc §4.1.
   - [ ] Bootstrap-topic creation hook (auto-create via `AdminClient` or
         require pre-created — open question 12.1; pick one with note).
   - [ ] Wire `KafkaStreamsPipelineRunner.run` to actually call
         `KafkaStreams.start()` on the built topology (instead of
         throwing). Returns a `PortablePipelineResult` that tracks the
         `KafkaStreams` instance state.
   
   ## Acceptance criteria
   
   - [ ] `./gradlew :runners:kafka-streams:check` green.
   - [ ] Unit test using `TopologyTestDriver`: build an Impulse-only
         pipeline proto, translate it, run the resulting topology, assert
         exactly one empty `byte[]` element appears at the output node
         and is not re-emitted on restart.
   - [ ] Unit test asserting any non-Impulse URN still throws the same
         "No translator registered for URN ..." message from #38465.
   - [ ] PreCommit workflow added in #38534 still triggers and passes.
   
   ## Out of scope (deferred to follow-up sub-issues)
   
   - ExecutableStage / stateless ParDo (next sub-issue).
   - GroupByKey, Combine, Window assignment, Flatten.
   - Watermark manager (per Jan: "watermark manager comes last when GBK
     forces it").
   - Splittable DoFn.
   
   ## Reference implementation
   
   - 
`runners/flink/2.0/src/main/java/.../FlinkStreamingPortablePipelineTranslator.java`
     — URN dispatch map pattern.
   - 
`runners/flink/2.0/src/main/java/.../translation/.../ImpulseSourceFunction.java`
     (and surrounding wiring) for the Flink-side analog. Kafka Streams
     needs a different approach because KS requires a real input topic.
   
   cc @je-ik
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to