This is an automated email from the ASF dual-hosted git repository.

je-ik pushed a commit to branch feat/18479-kafka-streams-runner-skeleton
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/feat/18479-kafka-streams-runner-skeleton by this push:
     new ff554f7438d [GSoC 2026] Kafka Streams runner — ExecutableStage 
(stateless ParDo) translator (#38764)
ff554f7438d is described below

commit ff554f7438d658aa887f59ca78942340473a4ebd
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Sun Jun 7 17:13:55 2026 +0500

    [GSoC 2026] Kafka Streams runner — ExecutableStage (stateless ParDo) 
translator (#38764)
    
    * Add ExecutableStage (stateless ParDo) translator with SDK-harness bridge
---
 runners/kafka-streams/build.gradle                 |   1 +
 .../translation/ExecutableStageProcessor.java      | 217 +++++++++++++++++++++
 .../translation/ExecutableStageTranslator.java     |  96 +++++++++
 .../kafka/streams/translation/KStreamsPayload.java |   9 +-
 .../KafkaStreamsExecutableStageContextFactory.java |  66 +++++++
 .../KafkaStreamsPipelineTranslator.java            |  19 +-
 .../translation/ExecutableStageTranslatorTest.java | 140 +++++++++++++
 .../KafkaStreamsPipelineTranslatorTest.java        |  28 ++-
 .../streams/translation/SharedTestCollector.java   |  92 +++++++++
 9 files changed, 657 insertions(+), 11 deletions(-)

diff --git a/runners/kafka-streams/build.gradle 
b/runners/kafka-streams/build.gradle
index 3f34a3ca76b..52b320dd70a 100644
--- a/runners/kafka-streams/build.gradle
+++ b/runners/kafka-streams/build.gradle
@@ -61,6 +61,7 @@ dependencies {
   permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version"
 
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
+  testImplementation project(":sdks:java:harness")
   testImplementation library.java.hamcrest
   testImplementation library.java.junit
   testImplementation library.java.mockito_core
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
new file mode 100644
index 00000000000..7417088bb67
--- /dev/null
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.WindowedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} that executes a fused {@link 
ExecutableStage} (stateless user
+ * code such as ParDo) in the Beam SDK harness over the Fn API.
+ *
+ * <p>For each {@link KStreamsPayload#isData() data} payload it unwraps the 
{@link WindowedValue}
+ * and feeds it to the harness through the stage's main input {@link 
FnDataReceiver}. Harness
+ * outputs are collected on the harness threads into {@link #pendingOutputs} 
and then flushed
+ * downstream on the Kafka Streams processing thread when the bundle closes — 
Kafka Streams' {@link
+ * ProcessorContext#forward} must only be called from the processing thread, 
so outputs are never
+ * forwarded directly from a harness callback.
+ *
+ * <p>A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle 
boundary: the open
+ * bundle (if any) is closed (flushing outputs), and the watermark is then 
forwarded downstream so
+ * that subsequent stages observe it after all data of the bundle.
+ *
+ * <p>This is the Kafka Streams analogue of Flink's {@code 
ExecutableStageDoFnOperator} and Spark's
+ * {@code SparkExecutableStageFunction}. State, timers, and side inputs are 
out of scope for this
+ * first version: the stage is executed with {@link 
StateRequestHandler#unsupported()} and no timer
+ * receivers.
+ */
+class ExecutableStageProcessor
+    implements Processor<byte[], KStreamsPayload<byte[]>, byte[], 
KStreamsPayload<byte[]>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutableStageProcessor.class);
+
+  private final RunnerApi.ExecutableStagePayload stagePayload;
+  private final JobInfo jobInfo;
+
+  // pendingOutputs is enqueued by SDK harness threads (inside the 
OutputReceiverFactory callback)
+  // and drained by the Kafka Streams processing thread on bundle close; needs 
to be thread-safe.
+  private final Queue<WindowedValue<byte[]>> pendingOutputs = new 
ConcurrentLinkedQueue<>();
+
+  private @Nullable ProcessorContext<byte[], KStreamsPayload<byte[]>> context;
+  private @Nullable ExecutableStageContext stageContext;
+  private @Nullable StageBundleFactory stageBundleFactory;
+  private @Nullable RemoteBundle currentBundle;
+
+  ExecutableStageProcessor(RunnerApi.ExecutableStagePayload stagePayload, 
JobInfo jobInfo) {
+    this.stagePayload = stagePayload;
+    this.jobInfo = jobInfo;
+  }
+
+  @Override
+  public void init(ProcessorContext<byte[], KStreamsPayload<byte[]>> context) {
+    this.context = context;
+    ExecutableStage executableStage = 
ExecutableStage.fromPayload(stagePayload);
+    this.stageContext = 
KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
+    this.stageBundleFactory = 
stageContext.getStageBundleFactory(executableStage);
+  }
+
+  @Override
+  public void process(Record<byte[], KStreamsPayload<byte[]>> record) {
+    KStreamsPayload<byte[]> payload = record.value();
+    if (payload.isWatermark()) {
+      // NOTE: flushing the bundle on every received watermark is provisional. 
Once the
+      // WatermarkManager lands, a stage will receive watermarks from multiple 
parent instances and
+      // the output watermark becomes min() across them — the bundle should 
flush / the output
+      // watermark advance only when that minimum actually moves forward, not 
on every received
+      // watermark. Tracked in #38743.
+      closeBundleAndFlush(record);
+      forwardWatermark(record, payload.getWatermarkMillis());
+      return;
+    }
+    try {
+      ensureBundleOpen();
+      mainInputReceiver().accept(payload.getData());
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to process element through SDK 
harness", e);
+    }
+  }
+
+  private void ensureBundleOpen() throws Exception {
+    if (currentBundle != null) {
+      return;
+    }
+    StageBundleFactory factory = checkInitialized(stageBundleFactory);
+    OutputReceiverFactory outputReceiverFactory =
+        new OutputReceiverFactory() {
+          @Override
+          public <OutputT> FnDataReceiver<OutputT> create(String 
pCollectionId) {
+            // Outputs are queued here on harness threads and drained on the 
processing thread
+            // after the bundle closes.
+            return receivedElement -> {
+              if (receivedElement != null) {
+                pendingOutputs.add((WindowedValue<byte[]>) receivedElement);
+              }
+            };
+          }
+        };
+    currentBundle =
+        factory.getBundle(
+            outputReceiverFactory,
+            StateRequestHandler.unsupported(),
+            BundleProgressHandler.ignored());
+  }
+
+  private FnDataReceiver<WindowedValue<?>> mainInputReceiver() {
+    RemoteBundle bundle = checkInitialized(currentBundle);
+    @SuppressWarnings("unchecked")
+    FnDataReceiver<WindowedValue<?>> receiver =
+        (FnDataReceiver<WindowedValue<?>>)
+            (FnDataReceiver<?>) 
Iterables.getOnlyElement(bundle.getInputReceivers().values());
+    return receiver;
+  }
+
+  private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>> 
record) {
+    RemoteBundle bundle = currentBundle;
+    if (bundle == null) {
+      return;
+    }
+    try {
+      // close() blocks until the harness finishes the bundle and all outputs 
have been delivered
+      // to the output receiver (and hence enqueued in pendingOutputs).
+      bundle.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to close SDK harness bundle", e);
+    } finally {
+      currentBundle = null;
+    }
+    ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = 
checkInitialized(context);
+    // The harness has finished the bundle (close() returned) so no further 
enqueues happen.
+    // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to 
drain via forEach.
+    pendingOutputs.forEach(
+        output ->
+            ctx.forward(
+                new Record<byte[], KStreamsPayload<byte[]>>(
+                    record.key(), KStreamsPayload.data(output), 
record.timestamp())));
+    pendingOutputs.clear();
+  }
+
+  private void forwardWatermark(
+      Record<byte[], KStreamsPayload<byte[]>> record, long watermarkMillis) {
+    // TODO(#38743 / WatermarkManager): a watermark must reach every parallel 
instance of every
+    // downstream processor, but ctx.forward routes to one downstream 
partition per Kafka Streams'
+    // partitioning. The simplest correct approach is to fan the watermark out 
to all downstream
+    // partitions; that wiring lands with the WatermarkManager sub-issue (per 
Jan on PR #38764).
+    ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = 
checkInitialized(context);
+    ctx.forward(
+        new Record<byte[], KStreamsPayload<byte[]>>(
+            record.key(), KStreamsPayload.watermark(watermarkMillis), 
record.timestamp()));
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (currentBundle != null) {
+        currentBundle.close();
+        currentBundle = null;
+      }
+    } catch (Exception e) {
+      LOG.warn("Error closing in-flight SDK harness bundle", e);
+    }
+    try {
+      if (stageBundleFactory != null) {
+        stageBundleFactory.close();
+        stageBundleFactory = null;
+      }
+    } catch (Exception e) {
+      LOG.warn("Error closing stage bundle factory", e);
+    }
+    try {
+      if (stageContext != null) {
+        stageContext.close();
+        stageContext = null;
+      }
+    } catch (Exception e) {
+      LOG.warn("Error closing executable stage context", e);
+    }
+  }
+
+  private static <T> T checkInitialized(@Nullable T value) {
+    if (value == null) {
+      throw new IllegalStateException("ExecutableStageProcessor used before 
init()");
+    }
+    return value;
+  }
+}
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
new file mode 100644
index 00000000000..dc56d57f57c
--- /dev/null
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.Topology;
+
+/**
+ * Translates the {@code beam:runner:executable_stage:v1} URN.
+ *
+ * <p>Adds an {@link ExecutableStageProcessor} node to the topology, wired to 
the processor that
+ * produces the stage's input PCollection (resolved through {@link
+ * KafkaStreamsTranslationContext#getProcessorNameForPCollection}). The 
processor runs the fused
+ * user code in the SDK harness; its single output PCollection is registered 
so downstream
+ * translators can attach to this node.
+ *
+ * <p>Multi-output stages (additional outputs / side inputs / state / timers) 
are out of scope for
+ * this first version and are rejected so the limitation fails fast rather 
than silently dropping
+ * outputs.
+ */
+class ExecutableStageTranslator implements PTransformTranslator {
+
+  @Override
+  public void translate(
+      String transformId, RunnerApi.Pipeline pipeline, 
KafkaStreamsTranslationContext context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(transformId);
+
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "Failed to parse ExecutableStagePayload for transform " + 
transformId, e);
+    }
+
+    // Fail fast on stage features that are not yet supported, so users get a 
clear message rather
+    // than a silent miss further down the harness/topology path.
+    if (stagePayload.getSideInputsCount() > 0) {
+      throw new UnsupportedOperationException(
+          "ExecutableStage "
+              + transformId
+              + " has side inputs; side inputs are not yet supported by the 
Kafka Streams runner.");
+    }
+    if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() 
> 0) {
+      throw new UnsupportedOperationException(
+          "ExecutableStage "
+              + transformId
+              + " uses user state or timers; stateful ParDo is not yet 
supported by the Kafka"
+              + " Streams runner.");
+    }
+    if (transform.getOutputsMap().size() > 1) {
+      // Multi-output stages (DoFns with side outputs, etc.) are a planned 
follow-up — they need
+      // an output-tag dispatch in the processor + per-output PCollection 
routing. The current
+      // rejection just fails loudly until that's wired in.
+      throw new UnsupportedOperationException(
+          "ExecutableStage "
+              + transformId
+              + " has "
+              + transform.getOutputsMap().size()
+              + " outputs; multi-output stages are not yet supported by the 
Kafka Streams runner.");
+    }
+
+    // The payload distinguishes the main input from side inputs, so reading 
it from the payload
+    // is unambiguous even before we add side-input support.
+    String inputPCollectionId = stagePayload.getInput();
+    String parentProcessor = 
context.getProcessorNameForPCollection(inputPCollectionId);
+
+    Topology topology = context.getTopology();
+    topology.addProcessor(
+        transformId,
+        () -> new ExecutableStageProcessor(stagePayload, context.getJobInfo()),
+        parentProcessor);
+
+    if (!transform.getOutputsMap().isEmpty()) {
+      String outputPCollectionId = 
Iterables.getOnlyElement(transform.getOutputsMap().values());
+      context.registerPCollectionProducer(outputPCollectionId, transformId);
+    }
+  }
+}
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
index 47c94eea6ef..53e47b1216b 100644
--- 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.kafka.streams.translation;
 
 import java.util.Objects;
 import org.apache.beam.sdk.values.WindowedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
@@ -121,6 +122,12 @@ public final class KStreamsPayload<T> {
 
   @Override
   public String toString() {
-    return kind == Kind.DATA ? "Data{" + data + "}" : "Watermark{" + 
watermarkMillis + "}";
+    MoreObjects.ToStringHelper helper = 
MoreObjects.toStringHelper(this).add("kind", kind);
+    if (kind == Kind.DATA) {
+      helper.add("data", data);
+    } else {
+      helper.add("watermarkMillis", watermarkMillis);
+    }
+    return helper.toString();
   }
 }
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
new file mode 100644
index 00000000000..d376ea0ba45
--- /dev/null
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+
+/**
+ * Provides one {@link ExecutableStageContext.Factory} per job for the Kafka 
Streams runner.
+ *
+ * <p>Mirrors {@code FlinkExecutableStageContextFactory}: a singleton that 
hands out reference-
+ * counted {@link DefaultExecutableStageContext}s keyed by job id, so the SDK 
harness environment
+ * for a job is created once and shared across the {@link 
ImpulseProcessor}/executable-stage
+ * processors that run within the same JVM instance.
+ */
+public class KafkaStreamsExecutableStageContextFactory implements 
ExecutableStageContext.Factory {
+
+  private static final KafkaStreamsExecutableStageContextFactory INSTANCE =
+      new KafkaStreamsExecutableStageContextFactory();
+
+  private final ConcurrentMap<String, ExecutableStageContext.Factory> 
jobFactories =
+      new ConcurrentHashMap<>();
+
+  private KafkaStreamsExecutableStageContextFactory() {}
+
+  public static KafkaStreamsExecutableStageContextFactory getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public ExecutableStageContext get(JobInfo jobInfo) {
+    ExecutableStageContext.Factory jobFactory =
+        jobFactories.computeIfAbsent(
+            jobInfo.jobId(),
+            k ->
+                ReferenceCountingExecutableStageContextFactory.create(
+                    DefaultExecutableStageContext::create,
+                    // Release the context synchronously once its reference 
count drops to zero,
+                    // and also drop the per-job factory entry so a long-lived 
JVM that runs many
+                    // jobs does not accumulate one entry per finished job.
+                    (caller) -> {
+                      jobFactories.remove(k);
+                      return true;
+                    }));
+    return jobFactory.get(jobInfo);
+  }
+}
diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
index eb856714614..5042f542616 100644
--- 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
@@ -22,6 +22,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
 import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.sdk.util.construction.graph.PipelineNode;
 import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -43,6 +45,7 @@ public class KafkaStreamsPipelineTranslator {
     this(
         ImmutableMap.<String, PTransformTranslator>builder()
             .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new 
ImpulseTranslator())
+            .put(ExecutableStage.URN, new ExecutableStageTranslator())
             .build());
   }
 
@@ -55,9 +58,21 @@ public class KafkaStreamsPipelineTranslator {
     return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions);
   }
 
-  /** Returns the pipeline to translate (placeholder for future fusion / 
expansion steps). */
+  /**
+   * Fuses the pipeline so that stateless user code is grouped into {@code 
ExecutableStage} nodes.
+   *
+   * <p>Runner-executed primitives that have their own translator (e.g. 
Impulse) are left intact;
+   * everything else is fused. If the pipeline already contains {@code 
ExecutableStage} transforms
+   * it is returned unchanged.
+   */
   public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) 
{
-    return pipeline;
+    boolean alreadyFused =
+        pipeline.getComponents().getTransformsMap().values().stream()
+            .anyMatch(t -> ExecutableStage.URN.equals(t.getSpec().getUrn()));
+    if (alreadyFused) {
+      return pipeline;
+    }
+    return GreedyPipelineFuser.fuse(pipeline).toPipeline();
   }
 
   /**
diff --git 
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
new file mode 100644
index 00000000000..98ec45ae288
--- /dev/null
+++ 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.Test;
+
+/**
+ * End-to-end test for {@link ExecutableStageTranslator}: builds an {@code 
Impulse -> ParDo}
+ * pipeline with the high-level Beam Java SDK, fuses + translates it, and runs 
the resulting Kafka
+ * Streams topology under {@link TopologyTestDriver}. The fused ParDo executes 
in an in-process
+ * (EMBEDDED) Java SDK harness, so the {@link DoFn}'s {@code @ProcessElement} 
body runs for real —
+ * no Docker, no broker.
+ *
+ * <p>Because the ParDo's output PCollection has no downstream consumer, it is 
not a stage output
+ * and is never forwarded out of the harness — that is the documented 
behaviour. The test verifies
+ * the bridge works by having the DoFn record into a {@link 
SharedTestCollector} as a side effect
+ * and asserting the recorded input from the test thread.
+ */
+public class ExecutableStageTranslatorTest {
+
+  private static final String JOB_ID = "kafka-streams-executable-stage-test";
+  private static final String APPLICATION_ID = "ks-executable-stage-test";
+
+  /**
+   * Records the length of every input element seen by the harness so the test 
can verify the DoFn
+   * ran. {@link SharedTestCollector} carries its identity via a UUID stored 
on the instance itself,
+   * so it survives any serialization the runner may perform on the DoFn.
+   */
+  private static class RecordingFn extends DoFn<byte[], byte[]> {
+    private final SharedTestCollector<Integer> collector;
+
+    RecordingFn(SharedTestCollector<Integer> collector) {
+      this.collector = collector;
+    }
+
+    @ProcessElement
+    public void processElement(@Element byte[] input, OutputReceiver<byte[]> 
out) {
+      collector.record(input.length);
+      // Still emit something so the output codepath of the harness is 
exercised, even though no
+      // downstream consumer means the runner never observes the value.
+      out.output(new byte[] {1});
+    }
+  }
+
+  @Test
+  public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement() 
throws Exception {
+    try (SharedTestCollector<Integer> collector = 
SharedTestCollector.create()) {
+      Pipeline pipeline = Pipeline.create(pipelineOptions());
+      pipeline
+          .apply("impulse", Impulse.create())
+          .apply("pardo", ParDo.of(new RecordingFn(collector)));
+
+      RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+      KafkaStreamsPipelineOptions options =
+          pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
+      KafkaStreamsPipelineTranslator translator = new 
KafkaStreamsPipelineTranslator();
+      JobInfo jobInfo =
+          JobInfo.create(
+              JOB_ID, options.getJobName(), "", 
PipelineOptionsTranslation.toProto(options));
+      KafkaStreamsTranslationContext context =
+          translator.createTranslationContext(jobInfo, options);
+
+      translator.translate(context, 
translator.prepareForTranslation(pipelineProto));
+
+      Topology topology = context.getTopology();
+      try (TopologyTestDriver driver = new TopologyTestDriver(topology, 
streamsConfig())) {
+        driver.advanceWallClockTime(Duration.ofSeconds(1));
+        driver.advanceWallClockTime(Duration.ofSeconds(1));
+      }
+
+      List<Integer> recorded = collector.recorded();
+      // Impulse emits exactly one empty byte[] in the GlobalWindow, so the 
DoFn must run exactly
+      // once and see a zero-length input.
+      assertThat(recorded.size(), is(1));
+      assertThat(recorded.get(0), is(0));
+    }
+  }
+
+  private static PipelineOptions pipelineOptions() {
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs("--applicationId=" + 
APPLICATION_ID).create();
+    options.setRunner(CrashingRunner.class);
+    
options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID);
+    options
+        .as(PortablePipelineOptions.class)
+        .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+    return options;
+  }
+
+  private static Properties streamsConfig() {
+    Properties props = new Properties();
+    props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(
+        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass().getName());
+    props.put(
+        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray().getClass().getName());
+    return props;
+  }
+}
diff --git 
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
index 44cc00bcebb..13baa551ebb 100644
--- 
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
+++ 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
@@ -18,16 +18,19 @@
 package org.apache.beam.runners.kafka.streams.translation;
 
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.util.construction.PTransformTranslation;
 import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
 import org.apache.kafka.streams.TopologyDescription;
 import org.junit.Test;
 
@@ -57,10 +60,11 @@ public class KafkaStreamsPipelineTranslatorTest {
                             .build()))
             .build();
 
+    // translate() directly — this test pins the URN-rejection contract on the 
dispatch loop
+    // itself, independent of the fuser/validator that prepareForTranslation 
runs.
     UnsupportedOperationException ex =
         assertThrows(
-            UnsupportedOperationException.class,
-            () -> translator.translate(context, 
translator.prepareForTranslation(pipeline)));
+            UnsupportedOperationException.class, () -> 
translator.translate(context, pipeline));
 
     assertThat(ex.getMessage(), containsString("No translator registered for 
URN"));
     assertThat(ex.getMessage(), 
containsString(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN));
@@ -73,15 +77,23 @@ public class KafkaStreamsPipelineTranslatorTest {
     KafkaStreamsPipelineTranslator translator = new 
KafkaStreamsPipelineTranslator();
     KafkaStreamsTranslationContext context = newContext();
 
-    RunnerApi.Pipeline pipeline = singleImpulsePipeline();
+    // Build the pipeline through the SDK so the resulting RunnerApi.Pipeline 
carries the coders
+    // and windowing strategies that PipelineValidator requires (run inside 
the fuser).
+    Pipeline sdkPipeline =
+        Pipeline.create(
+            PipelineOptionsFactory.fromArgs(
+                    "--applicationId=ks-translator-test",
+                    "--runner=" + CrashingRunner.class.getName())
+                .create());
+    sdkPipeline.apply("impulse", Impulse.create());
+    RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(sdkPipeline);
+
     translator.translate(context, translator.prepareForTranslation(pipeline));
 
     TopologyDescription description = context.getTopology().describe();
     String describeText = description.toString();
-
-    assertThat(describeText, containsString("impulse-source"));
-    assertThat(describeText, containsString("impulse"));
-    assertThat(context.getProcessorNameForPCollection(OUTPUT_PCOLLECTION_ID), 
is("impulse"));
+    assertThat(describeText, containsString("Source:"));
+    assertThat(describeText, containsString("Processor:"));
   }
 
   @Test
diff --git 
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
new file mode 100644
index 00000000000..0986b0f2dc1
--- /dev/null
+++ 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test-only side-effect sink that survives Beam serialization without losing 
collected elements.
+ *
+ * <p>An ExecutableStage that contains a user {@link 
org.apache.beam.sdk.transforms.DoFn} runs the
+ * DoFn in the SDK harness even when its output PCollection has no downstream 
consumer — the work is
+ * still performed for its side effects. The natural unit test for that is to 
have the DoFn record
+ * into a side-effect container and assert the container's contents from the 
test thread.
+ *
+ * <p>A plain static {@code AtomicReference} / {@code List} works only as long 
as the runner does
+ * not serialize the {@code DoFn} (and therefore the container instance it 
holds). The EMBEDDED
+ * environment may already, and could in the future, serialize the user code, 
in which case a cloned
+ * container would silently drop its writes.
+ *
+ * <p>This class works around that by keying the actual storage on a {@link 
UUID} held by an
+ * otherwise-empty instance. The instance itself is cheaply {@link 
Serializable}; clones still carry
+ * the same {@code UUID} and therefore see the same backing list in the static 
{@link #REGISTRY}.
+ *
+ * <p>Implements {@link AutoCloseable} so tests can use try-with-resources to 
drop the per-UUID
+ * entry from {@link #REGISTRY} once they finish reading the recorded elements 
— without {@code
+ * close}, a long-lived JVM running many tests would accumulate one orphan 
entry per test.
+ *
+ * @param <T> element type
+ */
+final class SharedTestCollector<T> implements Serializable, AutoCloseable {
+
+  private static final long serialVersionUID = 1L;
+
+  /** Per-UUID storage, populated lazily on the first {@code record} for each 
instance. */
+  private static final Map<UUID, List<Object>> REGISTRY = new 
ConcurrentHashMap<>();
+
+  private final UUID id = UUID.randomUUID();
+
+  /** Returns a fresh, empty collector instance with its own UUID. */
+  static <T> SharedTestCollector<T> create() {
+    return new SharedTestCollector<>();
+  }
+
+  /** Records a single element. Safe to call from any thread. */
+  void record(T element) {
+    REGISTRY.computeIfAbsent(id, k -> Collections.synchronizedList(new 
ArrayList<>())).add(element);
+  }
+
+  /** Returns an immutable snapshot of all recorded elements, in order. */
+  @SuppressWarnings("unchecked")
+  List<T> recorded() {
+    List<Object> raw = REGISTRY.get(id);
+    if (raw == null) {
+      return Collections.emptyList();
+    }
+    synchronized (raw) {
+      return Collections.unmodifiableList(new ArrayList<>((List<T>) (List<?>) 
raw));
+    }
+  }
+
+  /**
+   * Removes the per-UUID entry from the static registry. After {@code close}, 
any subsequent {@link
+   * #recorded()} call returns an empty list and {@link #record(Object)} will 
repopulate the entry
+   * for any new writes — but the typical use is to call {@code close} once at 
the end of the test
+   * (via try-with-resources) to keep {@link #REGISTRY} from accumulating 
orphan entries.
+   */
+  @Override
+  public void close() {
+    REGISTRY.remove(id);
+  }
+}


Reply via email to