This is an automated email from the ASF dual-hosted git repository.
je-ik pushed a commit to branch feat/18479-kafka-streams-runner-skeleton
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to
refs/heads/feat/18479-kafka-streams-runner-skeleton by this push:
new ff554f7438d [GSoC 2026] Kafka Streams runner — ExecutableStage
(stateless ParDo) translator (#38764)
ff554f7438d is described below
commit ff554f7438d658aa887f59ca78942340473a4ebd
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Sun Jun 7 17:13:55 2026 +0500
[GSoC 2026] Kafka Streams runner — ExecutableStage (stateless ParDo)
translator (#38764)
* Add ExecutableStage (stateless ParDo) translator with SDK-harness bridge
---
runners/kafka-streams/build.gradle | 1 +
.../translation/ExecutableStageProcessor.java | 217 +++++++++++++++++++++
.../translation/ExecutableStageTranslator.java | 96 +++++++++
.../kafka/streams/translation/KStreamsPayload.java | 9 +-
.../KafkaStreamsExecutableStageContextFactory.java | 66 +++++++
.../KafkaStreamsPipelineTranslator.java | 19 +-
.../translation/ExecutableStageTranslatorTest.java | 140 +++++++++++++
.../KafkaStreamsPipelineTranslatorTest.java | 28 ++-
.../streams/translation/SharedTestCollector.java | 92 +++++++++
9 files changed, 657 insertions(+), 11 deletions(-)
diff --git a/runners/kafka-streams/build.gradle
b/runners/kafka-streams/build.gradle
index 3f34a3ca76b..52b320dd70a 100644
--- a/runners/kafka-streams/build.gradle
+++ b/runners/kafka-streams/build.gradle
@@ -61,6 +61,7 @@ dependencies {
permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version"
testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
+ testImplementation project(":sdks:java:harness")
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
new file mode 100644
index 00000000000..7417088bb67
--- /dev/null
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.values.WindowedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kafka Streams {@link Processor} that executes a fused {@link
ExecutableStage} (stateless user
+ * code such as ParDo) in the Beam SDK harness over the Fn API.
+ *
+ * <p>For each {@link KStreamsPayload#isData() data} payload it unwraps the
{@link WindowedValue}
+ * and feeds it to the harness through the stage's main input {@link
FnDataReceiver}. Harness
+ * outputs are collected on the harness threads into {@link #pendingOutputs}
and then flushed
+ * downstream on the Kafka Streams processing thread when the bundle closes —
Kafka Streams' {@link
+ * ProcessorContext#forward} must only be called from the processing thread,
so outputs are never
+ * forwarded directly from a harness callback.
+ *
+ * <p>A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle
boundary: the open
+ * bundle (if any) is closed (flushing outputs), and the watermark is then
forwarded downstream so
+ * that subsequent stages observe it after all data of the bundle.
+ *
+ * <p>This is the Kafka Streams analogue of Flink's {@code
ExecutableStageDoFnOperator} and Spark's
+ * {@code SparkExecutableStageFunction}. State, timers, and side inputs are
out of scope for this
+ * first version: the stage is executed with {@link
StateRequestHandler#unsupported()} and no timer
+ * receivers.
+ */
+class ExecutableStageProcessor
+ implements Processor<byte[], KStreamsPayload<byte[]>, byte[],
KStreamsPayload<byte[]>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExecutableStageProcessor.class);
+
+ private final RunnerApi.ExecutableStagePayload stagePayload;
+ private final JobInfo jobInfo;
+
+ // pendingOutputs is enqueued by SDK harness threads (inside the
OutputReceiverFactory callback)
+ // and drained by the Kafka Streams processing thread on bundle close; needs
to be thread-safe.
+ private final Queue<WindowedValue<byte[]>> pendingOutputs = new
ConcurrentLinkedQueue<>();
+
+ private @Nullable ProcessorContext<byte[], KStreamsPayload<byte[]>> context;
+ private @Nullable ExecutableStageContext stageContext;
+ private @Nullable StageBundleFactory stageBundleFactory;
+ private @Nullable RemoteBundle currentBundle;
+
+ ExecutableStageProcessor(RunnerApi.ExecutableStagePayload stagePayload,
JobInfo jobInfo) {
+ this.stagePayload = stagePayload;
+ this.jobInfo = jobInfo;
+ }
+
+ @Override
+ public void init(ProcessorContext<byte[], KStreamsPayload<byte[]>> context) {
+ this.context = context;
+ ExecutableStage executableStage =
ExecutableStage.fromPayload(stagePayload);
+ this.stageContext =
KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo);
+ this.stageBundleFactory =
stageContext.getStageBundleFactory(executableStage);
+ }
+
+ @Override
+ public void process(Record<byte[], KStreamsPayload<byte[]>> record) {
+ KStreamsPayload<byte[]> payload = record.value();
+ if (payload.isWatermark()) {
+ // NOTE: flushing the bundle on every received watermark is provisional.
Once the
+ // WatermarkManager lands, a stage will receive watermarks from multiple
parent instances and
+ // the output watermark becomes min() across them — the bundle should
flush / the output
+ // watermark advance only when that minimum actually moves forward, not
on every received
+ // watermark. Tracked in #38743.
+ closeBundleAndFlush(record);
+ forwardWatermark(record, payload.getWatermarkMillis());
+ return;
+ }
+ try {
+ ensureBundleOpen();
+ mainInputReceiver().accept(payload.getData());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to process element through SDK
harness", e);
+ }
+ }
+
+ private void ensureBundleOpen() throws Exception {
+ if (currentBundle != null) {
+ return;
+ }
+ StageBundleFactory factory = checkInitialized(stageBundleFactory);
+ OutputReceiverFactory outputReceiverFactory =
+ new OutputReceiverFactory() {
+ @Override
+ public <OutputT> FnDataReceiver<OutputT> create(String
pCollectionId) {
+ // Outputs are queued here on harness threads and drained on the
processing thread
+ // after the bundle closes.
+ return receivedElement -> {
+ if (receivedElement != null) {
+ pendingOutputs.add((WindowedValue<byte[]>) receivedElement);
+ }
+ };
+ }
+ };
+ currentBundle =
+ factory.getBundle(
+ outputReceiverFactory,
+ StateRequestHandler.unsupported(),
+ BundleProgressHandler.ignored());
+ }
+
+ private FnDataReceiver<WindowedValue<?>> mainInputReceiver() {
+ RemoteBundle bundle = checkInitialized(currentBundle);
+ @SuppressWarnings("unchecked")
+ FnDataReceiver<WindowedValue<?>> receiver =
+ (FnDataReceiver<WindowedValue<?>>)
+ (FnDataReceiver<?>)
Iterables.getOnlyElement(bundle.getInputReceivers().values());
+ return receiver;
+ }
+
+ private void closeBundleAndFlush(Record<byte[], KStreamsPayload<byte[]>>
record) {
+ RemoteBundle bundle = currentBundle;
+ if (bundle == null) {
+ return;
+ }
+ try {
+ // close() blocks until the harness finishes the bundle and all outputs
have been delivered
+ // to the output receiver (and hence enqueued in pendingOutputs).
+ bundle.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close SDK harness bundle", e);
+ } finally {
+ currentBundle = null;
+ }
+ ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx =
checkInitialized(context);
+ // The harness has finished the bundle (close() returned) so no further
enqueues happen.
+ // ConcurrentLinkedQueue's weakly-consistent iterator is therefore safe to
drain via forEach.
+ pendingOutputs.forEach(
+ output ->
+ ctx.forward(
+ new Record<byte[], KStreamsPayload<byte[]>>(
+ record.key(), KStreamsPayload.data(output),
record.timestamp())));
+ pendingOutputs.clear();
+ }
+
+ private void forwardWatermark(
+ Record<byte[], KStreamsPayload<byte[]>> record, long watermarkMillis) {
+ // TODO(#38743 / WatermarkManager): a watermark must reach every parallel
instance of every
+ // downstream processor, but ctx.forward routes to one downstream
partition per Kafka Streams'
+ // partitioning. The simplest correct approach is to fan the watermark out
to all downstream
+ // partitions; that wiring lands with the WatermarkManager sub-issue (per
Jan on PR #38764).
+ ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx =
checkInitialized(context);
+ ctx.forward(
+ new Record<byte[], KStreamsPayload<byte[]>>(
+ record.key(), KStreamsPayload.watermark(watermarkMillis),
record.timestamp()));
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (currentBundle != null) {
+ currentBundle.close();
+ currentBundle = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing in-flight SDK harness bundle", e);
+ }
+ try {
+ if (stageBundleFactory != null) {
+ stageBundleFactory.close();
+ stageBundleFactory = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing stage bundle factory", e);
+ }
+ try {
+ if (stageContext != null) {
+ stageContext.close();
+ stageContext = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error closing executable stage context", e);
+ }
+ }
+
+ private static <T> T checkInitialized(@Nullable T value) {
+ if (value == null) {
+ throw new IllegalStateException("ExecutableStageProcessor used before
init()");
+ }
+ return value;
+ }
+}
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
new file mode 100644
index 00000000000..dc56d57f57c
--- /dev/null
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.kafka.streams.Topology;
+
+/**
+ * Translates the {@code beam:runner:executable_stage:v1} URN.
+ *
+ * <p>Adds an {@link ExecutableStageProcessor} node to the topology, wired to
the processor that
+ * produces the stage's input PCollection (resolved through {@link
+ * KafkaStreamsTranslationContext#getProcessorNameForPCollection}). The
processor runs the fused
+ * user code in the SDK harness; its single output PCollection is registered
so downstream
+ * translators can attach to this node.
+ *
+ * <p>Multi-output stages (additional outputs / side inputs / state / timers)
are out of scope for
+ * this first version and are rejected so the limitation fails fast rather
than silently dropping
+ * outputs.
+ */
+class ExecutableStageTranslator implements PTransformTranslator {
+
+ @Override
+ public void translate(
+ String transformId, RunnerApi.Pipeline pipeline,
KafkaStreamsTranslationContext context) {
+ RunnerApi.PTransform transform =
pipeline.getComponents().getTransformsOrThrow(transformId);
+
+ RunnerApi.ExecutableStagePayload stagePayload;
+ try {
+ stagePayload =
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "Failed to parse ExecutableStagePayload for transform " +
transformId, e);
+ }
+
+ // Fail fast on stage features that are not yet supported, so users get a
clear message rather
+ // than a silent miss further down the harness/topology path.
+ if (stagePayload.getSideInputsCount() > 0) {
+ throw new UnsupportedOperationException(
+ "ExecutableStage "
+ + transformId
+ + " has side inputs; side inputs are not yet supported by the
Kafka Streams runner.");
+ }
+ if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount()
> 0) {
+ throw new UnsupportedOperationException(
+ "ExecutableStage "
+ + transformId
+ + " uses user state or timers; stateful ParDo is not yet
supported by the Kafka"
+ + " Streams runner.");
+ }
+ if (transform.getOutputsMap().size() > 1) {
+ // Multi-output stages (DoFns with side outputs, etc.) are a planned
follow-up — they need
+ // an output-tag dispatch in the processor + per-output PCollection
routing. The current
+ // rejection just fails loudly until that's wired in.
+ throw new UnsupportedOperationException(
+ "ExecutableStage "
+ + transformId
+ + " has "
+ + transform.getOutputsMap().size()
+ + " outputs; multi-output stages are not yet supported by the
Kafka Streams runner.");
+ }
+
+ // The payload distinguishes the main input from side inputs, so reading
it from the payload
+ // is unambiguous even before we add side-input support.
+ String inputPCollectionId = stagePayload.getInput();
+ String parentProcessor =
context.getProcessorNameForPCollection(inputPCollectionId);
+
+ Topology topology = context.getTopology();
+ topology.addProcessor(
+ transformId,
+ () -> new ExecutableStageProcessor(stagePayload, context.getJobInfo()),
+ parentProcessor);
+
+ if (!transform.getOutputsMap().isEmpty()) {
+ String outputPCollectionId =
Iterables.getOnlyElement(transform.getOutputsMap().values());
+ context.registerPCollectionProducer(outputPCollectionId, transformId);
+ }
+ }
+}
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
index 47c94eea6ef..53e47b1216b 100644
---
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.kafka.streams.translation;
import java.util.Objects;
import org.apache.beam.sdk.values.WindowedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -121,6 +122,12 @@ public final class KStreamsPayload<T> {
@Override
public String toString() {
- return kind == Kind.DATA ? "Data{" + data + "}" : "Watermark{" +
watermarkMillis + "}";
+ MoreObjects.ToStringHelper helper =
MoreObjects.toStringHelper(this).add("kind", kind);
+ if (kind == Kind.DATA) {
+ helper.add("data", data);
+ } else {
+ helper.add("watermarkMillis", watermarkMillis);
+ }
+ return helper.toString();
}
}
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
new file mode 100644
index 00000000000..d376ea0ba45
--- /dev/null
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext;
+import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
+import
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+
+/**
+ * Provides one {@link ExecutableStageContext.Factory} per job for the Kafka
Streams runner.
+ *
+ * <p>Mirrors {@code FlinkExecutableStageContextFactory}: a singleton that
hands out reference-
+ * counted {@link DefaultExecutableStageContext}s keyed by job id, so the SDK
harness environment
+ * for a job is created once and shared across the {@link
ImpulseProcessor}/executable-stage
+ * processors that run within the same JVM instance.
+ */
+public class KafkaStreamsExecutableStageContextFactory implements
ExecutableStageContext.Factory {
+
+ private static final KafkaStreamsExecutableStageContextFactory INSTANCE =
+ new KafkaStreamsExecutableStageContextFactory();
+
+ private final ConcurrentMap<String, ExecutableStageContext.Factory>
jobFactories =
+ new ConcurrentHashMap<>();
+
+ private KafkaStreamsExecutableStageContextFactory() {}
+
+ public static KafkaStreamsExecutableStageContextFactory getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public ExecutableStageContext get(JobInfo jobInfo) {
+ ExecutableStageContext.Factory jobFactory =
+ jobFactories.computeIfAbsent(
+ jobInfo.jobId(),
+ k ->
+ ReferenceCountingExecutableStageContextFactory.create(
+ DefaultExecutableStageContext::create,
+ // Release the context synchronously once its reference
count drops to zero,
+ // and also drop the per-job factory entry so a long-lived
JVM that runs many
+ // jobs does not accumulate one entry per finished job.
+ (caller) -> {
+ jobFactories.remove(k);
+ return true;
+ }));
+ return jobFactory.get(jobInfo);
+ }
+}
diff --git
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
index eb856714614..5042f542616 100644
---
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
+++
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java
@@ -22,6 +22,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -43,6 +45,7 @@ public class KafkaStreamsPipelineTranslator {
this(
ImmutableMap.<String, PTransformTranslator>builder()
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new
ImpulseTranslator())
+ .put(ExecutableStage.URN, new ExecutableStageTranslator())
.build());
}
@@ -55,9 +58,21 @@ public class KafkaStreamsPipelineTranslator {
return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions);
}
- /** Returns the pipeline to translate (placeholder for future fusion /
expansion steps). */
+ /**
+ * Fuses the pipeline so that stateless user code is grouped into {@code
ExecutableStage} nodes.
+ *
+ * <p>Runner-executed primitives that have their own translator (e.g.
Impulse) are left intact;
+ * everything else is fused. If the pipeline already contains {@code
ExecutableStage} transforms
+ * it is returned unchanged.
+ */
public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline)
{
- return pipeline;
+ boolean alreadyFused =
+ pipeline.getComponents().getTransformsMap().values().stream()
+ .anyMatch(t -> ExecutableStage.URN.equals(t.getSpec().getUrn()));
+ if (alreadyFused) {
+ return pipeline;
+ }
+ return GreedyPipelineFuser.fuse(pipeline).toPipeline();
}
/**
diff --git
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
new file mode 100644
index 00000000000..98ec45ae288
--- /dev/null
+++
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.Test;
+
+/**
+ * End-to-end test for {@link ExecutableStageTranslator}: builds an {@code
Impulse -> ParDo}
+ * pipeline with the high-level Beam Java SDK, fuses + translates it, and runs
the resulting Kafka
+ * Streams topology under {@link TopologyTestDriver}. The fused ParDo executes
in an in-process
+ * (EMBEDDED) Java SDK harness, so the {@link DoFn}'s {@code @ProcessElement}
body runs for real —
+ * no Docker, no broker.
+ *
+ * <p>Because the ParDo's output PCollection has no downstream consumer, it is
not a stage output
+ * and is never forwarded out of the harness — that is the documented
behaviour. The test verifies
+ * the bridge works by having the DoFn record into a {@link
SharedTestCollector} as a side effect
+ * and asserting the recorded input from the test thread.
+ */
+public class ExecutableStageTranslatorTest {
+
+ private static final String JOB_ID = "kafka-streams-executable-stage-test";
+ private static final String APPLICATION_ID = "ks-executable-stage-test";
+
+ /**
+ * Records the length of every input element seen by the harness so the test
can verify the DoFn
+ * ran. {@link SharedTestCollector} carries its identity via a UUID stored
on the instance itself,
+ * so it survives any serialization the runner may perform on the DoFn.
+ */
+ private static class RecordingFn extends DoFn<byte[], byte[]> {
+ private final SharedTestCollector<Integer> collector;
+
+ RecordingFn(SharedTestCollector<Integer> collector) {
+ this.collector = collector;
+ }
+
+ @ProcessElement
+ public void processElement(@Element byte[] input, OutputReceiver<byte[]>
out) {
+ collector.record(input.length);
+ // Still emit something so the output codepath of the harness is
exercised, even though no
+ // downstream consumer means the runner never observes the value.
+ out.output(new byte[] {1});
+ }
+ }
+
+ @Test
+ public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement()
throws Exception {
+ try (SharedTestCollector<Integer> collector =
SharedTestCollector.create()) {
+ Pipeline pipeline = Pipeline.create(pipelineOptions());
+ pipeline
+ .apply("impulse", Impulse.create())
+ .apply("pardo", ParDo.of(new RecordingFn(collector)));
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+ KafkaStreamsPipelineOptions options =
+ pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
+ KafkaStreamsPipelineTranslator translator = new
KafkaStreamsPipelineTranslator();
+ JobInfo jobInfo =
+ JobInfo.create(
+ JOB_ID, options.getJobName(), "",
PipelineOptionsTranslation.toProto(options));
+ KafkaStreamsTranslationContext context =
+ translator.createTranslationContext(jobInfo, options);
+
+ translator.translate(context,
translator.prepareForTranslation(pipelineProto));
+
+ Topology topology = context.getTopology();
+ try (TopologyTestDriver driver = new TopologyTestDriver(topology,
streamsConfig())) {
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ }
+
+ List<Integer> recorded = collector.recorded();
+ // Impulse emits exactly one empty byte[] in the GlobalWindow, so the
DoFn must run exactly
+ // once and see a zero-length input.
+ assertThat(recorded.size(), is(1));
+ assertThat(recorded.get(0), is(0));
+ }
+ }
+
+ private static PipelineOptions pipelineOptions() {
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs("--applicationId=" +
APPLICATION_ID).create();
+ options.setRunner(CrashingRunner.class);
+
options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID);
+ options
+ .as(PortablePipelineOptions.class)
+ .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+ return options;
+ }
+
+ private static Properties streamsConfig() {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(
+ StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
+ props.put(
+ StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
+ return props;
+ }
+}
diff --git
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
index 44cc00bcebb..13baa551ebb 100644
---
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
+++
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java
@@ -18,16 +18,19 @@
package org.apache.beam.runners.kafka.streams.translation;
import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.kafka.streams.TopologyDescription;
import org.junit.Test;
@@ -57,10 +60,11 @@ public class KafkaStreamsPipelineTranslatorTest {
.build()))
.build();
+ // translate() directly — this test pins the URN-rejection contract on the
dispatch loop
+ // itself, independent of the fuser/validator that prepareForTranslation
runs.
UnsupportedOperationException ex =
assertThrows(
- UnsupportedOperationException.class,
- () -> translator.translate(context,
translator.prepareForTranslation(pipeline)));
+ UnsupportedOperationException.class, () ->
translator.translate(context, pipeline));
assertThat(ex.getMessage(), containsString("No translator registered for
URN"));
assertThat(ex.getMessage(),
containsString(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN));
@@ -73,15 +77,23 @@ public class KafkaStreamsPipelineTranslatorTest {
KafkaStreamsPipelineTranslator translator = new
KafkaStreamsPipelineTranslator();
KafkaStreamsTranslationContext context = newContext();
- RunnerApi.Pipeline pipeline = singleImpulsePipeline();
+ // Build the pipeline through the SDK so the resulting RunnerApi.Pipeline
carries the coders
+ // and windowing strategies that PipelineValidator requires (run inside
the fuser).
+ Pipeline sdkPipeline =
+ Pipeline.create(
+ PipelineOptionsFactory.fromArgs(
+ "--applicationId=ks-translator-test",
+ "--runner=" + CrashingRunner.class.getName())
+ .create());
+ sdkPipeline.apply("impulse", Impulse.create());
+ RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(sdkPipeline);
+
translator.translate(context, translator.prepareForTranslation(pipeline));
TopologyDescription description = context.getTopology().describe();
String describeText = description.toString();
-
- assertThat(describeText, containsString("impulse-source"));
- assertThat(describeText, containsString("impulse"));
- assertThat(context.getProcessorNameForPCollection(OUTPUT_PCOLLECTION_ID),
is("impulse"));
+ assertThat(describeText, containsString("Source:"));
+ assertThat(describeText, containsString("Processor:"));
}
@Test
diff --git
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
new file mode 100644
index 00000000000..0986b0f2dc1
--- /dev/null
+++
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test-only side-effect sink that survives Beam serialization without losing
collected elements.
+ *
+ * <p>An ExecutableStage that contains a user {@link
org.apache.beam.sdk.transforms.DoFn} runs the
+ * DoFn in the SDK harness even when its output PCollection has no downstream
consumer — the work is
+ * still performed for its side effects. The natural unit test for that is to
have the DoFn record
+ * into a side-effect container and assert the container's contents from the
test thread.
+ *
+ * <p>A plain static {@code AtomicReference} / {@code List} works only as long
as the runner does
+ * not serialize the {@code DoFn} (and therefore the container instance it
holds). The EMBEDDED
+ * environment may already, and could in the future, serialize the user code,
in which case a cloned
+ * container would silently drop its writes.
+ *
+ * <p>This class works around that by keying the actual storage on a {@link
UUID} held by an
+ * otherwise-empty instance. The instance itself is cheaply {@link
Serializable}; clones still carry
+ * the same {@code UUID} and therefore see the same backing list in the static
{@link #REGISTRY}.
+ *
+ * <p>Implements {@link AutoCloseable} so tests can use try-with-resources to
drop the per-UUID
+ * entry from {@link #REGISTRY} once they finish reading the recorded elements
— without {@code
+ * close}, a long-lived JVM running many tests would accumulate one orphan
entry per test.
+ *
+ * @param <T> element type
+ */
+final class SharedTestCollector<T> implements Serializable, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Per-UUID storage, populated lazily on the first {@code record} for each
instance. */
+ private static final Map<UUID, List<Object>> REGISTRY = new
ConcurrentHashMap<>();
+
+ private final UUID id = UUID.randomUUID();
+
+ /** Returns a fresh, empty collector instance with its own UUID. */
+ static <T> SharedTestCollector<T> create() {
+ return new SharedTestCollector<>();
+ }
+
+ /** Records a single element. Safe to call from any thread. */
+ void record(T element) {
+ REGISTRY.computeIfAbsent(id, k -> Collections.synchronizedList(new
ArrayList<>())).add(element);
+ }
+
+ /** Returns an immutable snapshot of all recorded elements, in order. */
+ @SuppressWarnings("unchecked")
+ List<T> recorded() {
+ List<Object> raw = REGISTRY.get(id);
+ if (raw == null) {
+ return Collections.emptyList();
+ }
+ synchronized (raw) {
+ return Collections.unmodifiableList(new ArrayList<>((List<T>) (List<?>)
raw));
+ }
+ }
+
+ /**
+ * Removes the per-UUID entry from the static registry. After {@code close},
any subsequent {@link
+ * #recorded()} call returns an empty list and {@link #record(Object)} will
repopulate the entry
+ * for any new writes — but the typical use is to call {@code close} once at
the end of the test
+ * (via try-with-resources) to keep {@link #REGISTRY} from accumulating
orphan entries.
+ */
+ @Override
+ public void close() {
+ REGISTRY.remove(id);
+ }
+}