This is an automated email from the ASF dual-hosted git repository.

je-ik pushed a commit to branch feat/18479-kafka-streams-runner-skeleton
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/feat/18479-kafka-streams-runner-skeleton by this push:
     new 4636b1f333e #38957: Add in-memory WatermarkManager core 
(per-source-partition tracking
4636b1f333e is described below

commit 4636b1f333eae8b0969b91d42bc436af59a66e47
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Mon Jun 15 22:37:55 2026 +0500

    #38957: Add in-memory WatermarkManager core (per-source-partition tracking
---
 .../streams/translation/WatermarkManager.java      | 157 +++++++++++++++++++++
 .../streams/translation/WatermarkManagerTest.java  | 155 ++++++++++++++++++++
 2 files changed, 312 insertions(+)

diff --git 
a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/WatermarkManager.java
 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/WatermarkManager.java
new file mode 100644
index 00000000000..cd0fca3654a
--- /dev/null
+++ 
b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/WatermarkManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Instant;
+
+/**
+ * In-memory tracker of a single fused stage's input watermark, computed from 
the committed
+ * watermarks reported by the <i>upstream source partitions</i> that feed it 
(the output /
+ * repartition-topic partitions of the parent stage).
+ *
+ * <p>This is the core of the Kafka Streams runner's watermark propagation, 
decoupled from the Kafka
+ * wiring so it can be unit-tested in isolation. The wiring that produces the 
reports (flushing
+ * {@code (sourcePartition, committedWatermark, totalSourcePartitions)} 
atomically with each offset
+ * commit and fanning it out to every downstream partition) and consumes them 
lands in a follow-up.
+ *
+ * <h3>Why source partitions, not producer instances</h3>
+ *
+ * <p>The question a stage has to answer is "have I received the watermark 
from every upstream
+ * producer, so that {@code min()} across them is meaningful?". Counting 
<i>producer instances</i>
+ * is hard: an instance can be killed without notice, leaving stale state, and 
the number changes on
+ * every rebalance. Counting <i>source partitions</i> is robust instead, 
because the partition count
+ * is fixed and known: it travels in-band with every report ({@code 
totalSourcePartitions}), a
+ * partition is always owned by exactly one live instance, and when an 
instance dies its partitions
+ * are reassigned and the new owner keeps reporting. So the manager only ever 
reasons about
+ * partitions, never about instances. (Design agreed with the mentor; see the 
watermark
+ * coordination-channel PoC findings.)
+ *
+ * <h3>Holding until ready</h3>
+ *
+ * <p>Until a committed watermark has been seen for <i>every</i> source 
partition, the stage's input
+ * watermark is undefined and {@link #advance()} returns {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE} —
+ * i.e. the stage emits no meaningful watermark downstream. A change in {@code
+ * totalSourcePartitions} (e.g. a repartition) clears the accumulated reports 
and re-opens this hold
+ * until the new full set has reported, which subsumes the "new epoch / 
revert" rule without an
+ * explicit epoch.
+ *
+ * <h3>Monotonicity</h3>
+ *
+ * <p>Beam watermarks must be non-decreasing. Each source partition's 
watermark is held monotonic (a
+ * lower report is ignored), and the emitted stage watermark is additionally 
clamped so it never
+ * regresses below the previously emitted value — relevant if a newly appeared 
partition reports an
+ * older watermark after the stage had already advanced.
+ *
+ * <p>Not thread-safe; the caller (a single Kafka Streams processor thread) 
serializes access.
+ */
+public final class WatermarkManager {
+
+  /** Total source partitions feeding this stage, learned in-band; -1 until 
the first report. */
+  private int expectedSourcePartitionCount = -1;
+
+  /** Latest committed watermark per source partition (kept monotonic 
non-decreasing). */
+  private final Map<Integer, Instant> committedWatermarkByPartition = new 
HashMap<>();
+
+  /** Last watermark {@link #advance()} emitted, to enforce a non-decreasing 
output. */
+  private Instant lastEmitted = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /**
+   * Record a committed watermark reported for one source partition, together 
with the total source
+   * partition count carried in-band with the report.
+   *
+   * @param sourcePartition the source partition the report is for, in {@code 
[0,
+   *     totalSourcePartitions)}
+   * @param committedWatermark the committed watermark for that partition
+   * @param totalSourcePartitions the total number of upstream source 
partitions feeding this stage
+   */
+  public void observe(int sourcePartition, Instant committedWatermark, int 
totalSourcePartitions) {
+    if (committedWatermark == null) {
+      throw new IllegalArgumentException("committedWatermark must not be 
null");
+    }
+    if (totalSourcePartitions <= 0) {
+      throw new IllegalArgumentException(
+          "totalSourcePartitions must be positive: " + totalSourcePartitions);
+    }
+    if (sourcePartition < 0 || sourcePartition >= totalSourcePartitions) {
+      throw new IllegalArgumentException(
+          "sourcePartition "
+              + sourcePartition
+              + " out of range for totalSourcePartitions "
+              + totalSourcePartitions);
+    }
+    if (totalSourcePartitions != expectedSourcePartitionCount) {
+      // The source partition set changed (e.g. a repartition). The previous 
per-partition
+      // watermarks describe a different partitioning, so drop them entirely 
and re-open the hold
+      // until the new full set reports. The output watermark still cannot 
regress (lastEmitted is
+      // retained).
+      expectedSourcePartitionCount = totalSourcePartitions;
+      committedWatermarkByPartition.clear();
+    }
+    // A source partition's watermark is monotonic non-decreasing; ignore an 
out-of-order lower
+    // report.
+    committedWatermarkByPartition.merge(
+        sourcePartition, committedWatermark, (oldW, newW) -> 
newW.isAfter(oldW) ? newW : oldW);
+  }
+
+  /** True once a committed watermark has been seen for every current source 
partition. */
+  public boolean isReady() {
+    return expectedSourcePartitionCount > 0
+        && committedWatermarkByPartition.size() == 
expectedSourcePartitionCount;
+  }
+
+  /**
+   * Advance and return the stage input watermark.
+   *
+   * <p>Returns {@link BoundedWindow#TIMESTAMP_MIN_VALUE} while the stage is 
still holding (not
+   * every source partition has reported) — the caller emits nothing 
meaningful downstream in that
+   * case. Once ready, returns {@code min()} over all source partitions, 
clamped to never regress
+   * below the previously emitted value. The sequence of values returned 
across calls is
+   * non-decreasing.
+   */
+  public Instant advance() {
+    if (!isReady()) {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    // isReady() guarantees the map is non-empty, so the seed is always 
replaced by a real value.
+    Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    for (Instant w : committedWatermarkByPartition.values()) {
+      if (w.isBefore(min)) {
+        min = w;
+      }
+    }
+    Instant emit = min.isAfter(lastEmitted) ? min : lastEmitted;
+    lastEmitted = emit;
+    return emit;
+  }
+
+  /** The total source partition count learned in-band, or -1 if nothing 
reported yet. */
+  @VisibleForTesting
+  int expectedSourcePartitionCount() {
+    return expectedSourcePartitionCount;
+  }
+
+  /** How many distinct source partitions have reported so far. */
+  @VisibleForTesting
+  int reportedPartitionCount() {
+    return committedWatermarkByPartition.size();
+  }
+}
diff --git 
a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkManagerTest.java
 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkManagerTest.java
new file mode 100644
index 00000000000..1d642e5ea1a
--- /dev/null
+++ 
b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/WatermarkManagerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for {@link WatermarkManager}. */
+public class WatermarkManagerTest {
+
+  private static Instant ts(long millis) {
+    return new Instant(millis);
+  }
+
+  @Test
+  public void holdsBeforeAnyReport() {
+    WatermarkManager manager = new WatermarkManager();
+    assertThat(manager.isReady(), is(false));
+    assertThat(manager.advance(), is(BoundedWindow.TIMESTAMP_MIN_VALUE));
+  }
+
+  @Test
+  public void holdsUntilEverySourcePartitionReports() {
+    WatermarkManager manager = new WatermarkManager();
+    manager.observe(0, ts(100L), 4);
+    manager.observe(1, ts(100L), 4);
+    manager.observe(2, ts(100L), 4);
+    // Three of four partitions reported — still holding.
+    assertThat(manager.isReady(), is(false));
+    assertThat(manager.advance(), is(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    manager.observe(3, ts(100L), 4);
+    assertThat(manager.isReady(), is(true));
+    assertThat(manager.advance(), is(ts(100L)));
+  }
+
+  @Test
+  public void emitsMinAcrossPartitionsOnceReady() {
+    WatermarkManager manager = new WatermarkManager();
+    manager.observe(0, ts(300L), 4);
+    manager.observe(1, ts(100L), 4);
+    manager.observe(2, ts(500L), 4);
+    manager.observe(3, ts(200L), 4);
+    // min(300, 100, 500, 200) = 100
+    assertThat(manager.advance(), is(ts(100L)));
+  }
+
+  @Test
+  public void perPartitionWatermarkIsMonotonic() {
+    WatermarkManager manager = new WatermarkManager();
+    manager.observe(0, ts(100L), 1);
+    assertThat(manager.advance(), is(ts(100L)));
+    // A lower out-of-order report for the same partition is ignored.
+    manager.observe(0, ts(50L), 1);
+    assertThat(manager.advance(), is(ts(100L)));
+  }
+
+  @Test
+  public void partitionCountChangeClearsReportsAndReopensHold() {
+    WatermarkManager manager = new WatermarkManager();
+    manager.observe(0, ts(100L), 2);
+    manager.observe(1, ts(100L), 2);
+    assertThat(manager.advance(), is(ts(100L)));
+
+    // Source set grows to 4. The previous reports are dropped, so the stage 
holds again until all
+    // four of the new partition set have reported.
+    manager.observe(0, ts(200L), 4);
+    assertThat(manager.reportedPartitionCount(), is(1));
+    assertThat(manager.isReady(), is(false));
+    assertThat(manager.advance(), is(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    manager.observe(1, ts(200L), 4);
+    manager.observe(2, ts(200L), 4);
+    manager.observe(3, ts(200L), 4);
+    assertThat(manager.isReady(), is(true));
+    assertThat(manager.advance(), is(ts(200L)));
+  }
+
+  @Test
+  public void emittedWatermarkDoesNotRegressAfterRepartition() {
+    WatermarkManager manager = new WatermarkManager();
+    manager.observe(0, ts(100L), 2);
+    manager.observe(1, ts(100L), 2);
+    assertThat(manager.advance(), is(ts(100L)));
+
+    // After a repartition to 3, the new partitions report older watermarks. 
Once ready again the
+    // min is 50, but the emitted stage watermark must not go backwards below 
100.
+    manager.observe(0, ts(50L), 3);
+    manager.observe(1, ts(50L), 3);
+    manager.observe(2, ts(50L), 3);
+    assertThat(manager.isReady(), is(true));
+    assertThat(manager.advance(), is(ts(100L)));
+  }
+
+  @Test
+  public void partitionCountDecreaseClearsAndRecomputes() {
+    WatermarkManager manager = new WatermarkManager();
+    manager.observe(0, ts(100L), 4);
+    manager.observe(1, ts(100L), 4);
+    manager.observe(2, ts(100L), 4);
+    manager.observe(3, ts(50L), 4);
+    assertThat(manager.advance(), is(ts(50L)));
+
+    // Source set shrinks to 2. Reports are cleared; the stage holds until {0, 
1} report again.
+    manager.observe(0, ts(100L), 2);
+    assertThat(manager.expectedSourcePartitionCount(), is(2));
+    assertThat(manager.reportedPartitionCount(), is(1));
+    assertThat(manager.isReady(), is(false));
+
+    manager.observe(1, ts(100L), 2);
+    assertThat(manager.isReady(), is(true));
+    // min is 100; the non-regression clamp keeps it >= the previously emitted 
50.
+    assertThat(manager.advance(), is(ts(100L)));
+  }
+
+  @Test
+  public void rejectsNullWatermark() {
+    WatermarkManager manager = new WatermarkManager();
+    assertThrows(IllegalArgumentException.class, () -> manager.observe(0, 
null, 4));
+  }
+
+  @Test
+  public void rejectsNonPositiveTotalSourcePartitions() {
+    WatermarkManager manager = new WatermarkManager();
+    assertThrows(IllegalArgumentException.class, () -> manager.observe(0, 
ts(100L), 0));
+    assertThrows(IllegalArgumentException.class, () -> manager.observe(0, 
ts(100L), -1));
+  }
+
+  @Test
+  public void rejectsOutOfRangeSourcePartition() {
+    WatermarkManager manager = new WatermarkManager();
+    assertThrows(IllegalArgumentException.class, () -> manager.observe(-1, 
ts(100L), 4));
+    assertThrows(IllegalArgumentException.class, () -> manager.observe(4, 
ts(100L), 4));
+  }
+}

Reply via email to