This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bfff80b6fa4 [Dataflow Streaming] Add WindmillTagEncodingV2. (#37151)
bfff80b6fa4 is described below

commit bfff80b6fa49260366d11576c4f4269759fda32e
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Jan 8 01:51:07 2026 -0800

    [Dataflow Streaming] Add WindmillTagEncodingV2. (#37151)
    
    This adds a new way of encoding windmill state tags and timer tags.
    The new code is behind an unstable experiment. More work is needed
    before it can be used on real workloads.
---
 .../apache/beam/runners/core/StateNamespaces.java  |   8 +
 .../org/apache/beam/runners/core/StateTags.java    |  17 +
 .../worker/StreamingModeExecutionContext.java      |  12 +-
 .../dataflow/worker/WindmillTimerInternals.java    |   6 +-
 .../worker/windmill/state/WindmillTagEncoding.java |   5 +-
 .../windmill/state/WindmillTagEncodingV1.java      |   3 +-
 .../windmill/state/WindmillTagEncodingV2.java      | 406 +++++++++++++++
 .../processing/ComputationWorkExecutorFactory.java |  14 +-
 .../worker/StreamingModeExecutionContextTest.java  |   3 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |   6 +-
 .../windmill/state/WindmillTagEncodingV2Test.java  | 576 +++++++++++++++++++++
 11 files changed, 1044 insertions(+), 12 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
index a68ab6c913c..e919d12eaac 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
@@ -102,6 +102,10 @@ public class StateNamespaces {
       return window;
     }
 
+    public Coder<W> getWindowCoder() {
+      return windowCoder;
+    }
+
     @Override
     public String stringKey() {
       try {
@@ -170,6 +174,10 @@ public class StateNamespaces {
       return window;
     }
 
+    public Coder<W> getWindowCoder() {
+      return windowCoder;
+    }
+
     public int getTriggerIndex() {
       return triggerIndex;
     }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index ba5478be6c7..5d69abe8ffc 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -144,6 +144,8 @@ public class StateTags {
 
   private interface SystemStateTag<StateT extends State> {
     StateTag<StateT> asKind(StateKind kind);
+
+    StateKind getKind();
   }
 
   /** Create a state tag for the given id and spec. */
@@ -243,6 +245,16 @@ public class StateTags {
     return typedTag.asKind(StateKind.SYSTEM);
   }
 
+  /*
+   * Returns true if the tag is a system internal tag.
+   */
+  public static <StateT extends State> boolean 
isSystemTagInternal(StateTag<StateT> tag) {
+    if (!(tag instanceof SystemStateTag)) {
+      return false;
+    }
+    return StateKind.SYSTEM.equals(((SystemStateTag<?>) tag).getKind());
+  }
+
   public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> 
convertToBagTagInternal(
       StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
     return new SimpleStateTag<>(
@@ -358,6 +370,11 @@ public class StateTags {
       return new SimpleStateTag<>(id.asKind(kind), spec);
     }
 
+    @Override
+    public StateKind getKind() {
+      return id.kind;
+    }
+
     @Override
     public boolean equals(@Nullable Object other) {
       if (!(other instanceof SimpleStateTag)) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index ec996c9ab02..09afcadc300 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -59,10 +59,12 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -154,13 +156,14 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       String computationId,
       ReaderCache readerCache,
       Map<String, String> stateNameMap,
-      WindmillStateCache.ForComputation stateCache,
+      ForComputation stateCache,
       MetricsContainerRegistry<StreamingStepMetricsContainer> 
metricsContainerRegistry,
       DataflowExecutionStateTracker executionStateTracker,
       StreamingModeExecutionStateRegistry executionStateRegistry,
       StreamingGlobalConfigHandle globalConfigHandle,
       long sinkByteLimit,
-      boolean throwExceptionOnLargeOutput) {
+      boolean throwExceptionOnLargeOutput,
+      boolean enableWindmillTagEncodingV2) {
     super(
         counterFactory,
         metricsContainerRegistry,
@@ -171,7 +174,10 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     this.readerCache = readerCache;
     this.globalConfigHandle = globalConfigHandle;
     this.sideInputCache = new HashMap<>();
-    this.windmillTagEncoding = WindmillTagEncodingV1.instance();
+    this.windmillTagEncoding =
+        enableWindmillTagEncodingV2
+            ? WindmillTagEncodingV2.instance()
+            : WindmillTagEncodingV1.instance();
     this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
     this.stateCache = stateCache;
     this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index cb41aa1ccab..4287188c35b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -201,7 +201,7 @@ class WindmillTimerInternals implements TimerInternals {
             // Setting a timer, clear any prior hold and set to the new value
             outputBuilder
                 .addWatermarkHoldsBuilder()
-                .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
+                .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, 
timer.getTag()))
                 .setStateFamily(stateFamily)
                 .setReset(true)
                 .addTimestamps(
@@ -210,7 +210,7 @@ class WindmillTimerInternals implements TimerInternals {
             // Clear the hold in case a previous iteration of this timer set 
one.
             outputBuilder
                 .addWatermarkHoldsBuilder()
-                .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
+                .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, 
timer.getTag()))
                 .setStateFamily(stateFamily)
                 .setReset(true);
           }
@@ -225,7 +225,7 @@ class WindmillTimerInternals implements TimerInternals {
           // We are deleting timer; clear the hold
           outputBuilder
               .addWatermarkHoldsBuilder()
-              .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
+              .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, 
timer.getTag()))
               .setStateFamily(stateFamily)
               .setReset(true);
         }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
index 59841f67347..a979a1d982c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
@@ -54,8 +54,11 @@ public abstract class WindmillTagEncoding {
   /**
    * Produce a state tag that is guaranteed to be unique for the given timer, 
to add a watermark
    * hold that is only freed after the timer fires.
+   *
+   * @param timerTag tag of the timer that maps to the hold.
    */
-  public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, 
TimerData timerData);
+  public abstract ByteString timerHoldTag(
+      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
timerTag);
 
   /**
    * Produce a tag that is guaranteed to be unique for the given prefix, 
namespace, domain and
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
index 19e31351a52..14c3f8c0179 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
@@ -70,7 +70,8 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
 
   /** {@inheritDoc} */
   @Override
-  public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+  public ByteString timerHoldTag(
+      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
unusedTimerTag) {
     String tagString;
     if ("".equals(timerData.getTimerFamilyId())) {
       tagString =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
new file mode 100644
index 00000000000..0702c375282
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowAndTriggerNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import 
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
+import 
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import org.joda.time.Instant;
+
+/**
+ * Encodes and decodes StateTags and TimerTags from and to windmill bytes. 
This encoding scheme
+ * enforces a specific lexicographical order on state tags. The ordering 
enables building range
+ * filters using the tags.
+ *
+ * <h2>1. High-Level Tag Formats</h2>
+ *
+ * <p>State tags and Timer tags differ in structure but share common component 
encodings.
+ *
+ * <h3>1.1 State Tag Encoding</h3>
+ *
+ * <p>Used for generic state variables (e.g., ValueState, BagState, etc).
+ *
+ * <pre>
+ * Format:
+ * | Encoded Namespace | Encoded Address |
+ * </pre>
+ *
+ * <ul>
+ *   <li><b>Encoded Namespace:</b> Encodes the state namespace (see Section 
2.1).
+ *   <li><b>Encoded Address:</b> Encodes the state variable address (see 
Section 2.3).
+ * </ul>
+ *
+ * <h3>1.2 Timer/Timer Hold Tag Encoding</h3>
+ *
+ * <p>Specialized tags, used for timers and automatic watermark holds 
associated with the timers.
+ *
+ * <pre>
+ * Format:
+ * | Encoded Namespace | Tag Type | Timer Family Id | Timer Id |
+ *
+ * 
+-------------------+-----------------------------------------------------------+
+ * | Field             | Format                                                
    |
+ * 
+-------------------+-----------------------------------------------------------+
+ * | Encoded Namespace | Encoded namespace (see Section 2.1).                  
    |
+ * 
+-------------------+-----------------------------------------------------------+
+ * | Tag Type          | {@code 0x03} (Single byte): System Timer/Watermark 
Hold   |
+ * |                   | {@code 0x04} (Single byte): User Timer/Watermark Hold 
    |
+ * 
+-------------------+-----------------------------------------------------------+
+ * | Timer Family ID   | TimerFamilyId encoded via length prefixed             
    |
+ * |                   | {@code StringUtf8Coder}.                              
    |
+ * 
+-------------------+-----------------------------------------------------------+
+ * | Timer ID          | TimerId encoded via length prefixed                   
    |
+ * |                   | {@code StringUtf8Coder}.                              
    |
+ * 
+-------------------+-----------------------------------------------------------+
+ * </pre>
+ *
+ * <h2>2. Component Encodings</h2>
+ *
+ * <h3>2.1 Namespace Encoding</h3>
+ *
+ * <p>Namespaces are prefixed with a byte ID to control sorting order.
+ *
+ * <pre>
+ * 
+---------------------------+-------------------------------------------------------------+
+ * | Namespace Type            | Format                                        
              |
+ * 
+---------------------------+-------------------------------------------------------------+
+ * | GlobalNamespace           | | {@code 0x01} |                              
              |
+ * |                           | (Single byte)                                 
              |
+ * 
+---------------------------+-------------------------------------------------------------+
+ * | WindowNamespace           | | {@code 0x10} | Encoded Window | {@code 
0x01} |            |
+ * |                           | (See Section 2.2)                             
              |
+ * 
+---------------------------+-------------------------------------------------------------+
+ * | WindowAndTriggerNamespace | | {@code 0x10} | Encoded Window | {@code 
0x02} | TriggerIndex |
+ * |                           | (See Section 2.2 for Encoded Window)          
              |
+ * |                           | TriggerIndex is encoded by {@code 
BigEndianIntegerCoder}    |
+ * 
+---------------------------+-------------------------------------------------------------+
+ * </pre>
+ *
+ * <h3>2.2 Window Encoding</h3>
+ *
+ * <h4>2.2.1 IntervalWindow</h4>
+ *
+ * <p>IntervalWindows use a custom encoding that is different from the 
IntervalWindowCoder.
+ *
+ * <pre>
+ * Format:
+ * | 0x64 | End Time | Start Time |
+ * </pre>
+ *
+ * <ul>
+ *   <li><b>Prefix:</b> {@code 0x64}. Single byte identifying Interval windows.
+ *   <li><b>End Time:</b> {@code intervalWindow.end()} encoded via {@code 
InstantCoder}.
+ *   <li><b>Start Time:</b> {@code intervalWindow.start()} encoded via {@code 
InstantCoder}.
+ * </ul>
+ *
+ * <p><b>Note:</b> {@code InstantCoder} preserves the sort order. The encoded 
IntervalWindow is to
+ * be sorted based on {@code [End Time, Start Time]} directly without needing 
to decode.
+ *
+ * <h4>2.2.2 Other Windows</h4>
+ *
+ * <p>All non-IntervalWindows use the standard window coders.
+ *
+ * <pre>
+ * Format:
+ * | 0x02 | Window |
+ * </pre>
+ *
+ * <ul>
+ *   <li><b>Prefix:</b> {@code 0x02}. Single byte identifying non-Interval 
windows.
+ *   <li><b>Window:</b> The window serialized using its {@code windowCoder}.
+ * </ul>
+ *
+ * <h3>2.3 Address Encoding</h3>
+ *
+ * <p>Combines the state type and the state identifier.
+ *
+ * <pre>
+ * Format:
+ * | State Type | Address |
+ *
+ * 
+------------+-----------------------------------------------------------------+
+ * | Field      | Format                                                       
   |
+ * 
+------------+-----------------------------------------------------------------+
+ * | State Type | {@code 0x01} (Single byte): System State                     
   |
+ * |            | {@code 0x02} (Single byte): User State                       
   |
+ * 
+------------+-----------------------------------------------------------------+
+ * | Address    | The state address (string) is encoded via length prefixed    
   |
+ * |            | {@code StringUtf8Coder}.                                     
   |
+ * 
+------------+-----------------------------------------------------------------+
+ * </pre>
+ *
+ * <h2>3. Tag Ordering</h2>
+ *
+ * <p>The encoding prefixes are chosen to enforce the following 
lexicographical sort order (lowest
+ * to highest):
+ *
+ * <ol>
+ *   <li><b>Tags in Global Namespace</b> (Prefix {@code 0x01})
+ *   <li><b>Tags in Non-Interval Windows</b> (Prefix {@code 0x1002})
+ *   <li><b>Tags in Interval Windows</b> (Prefix {@code 0x1064})
+ *       <ul>
+ *         <li>Sorted internally by {@code [EndTime, StartTime]}.
+ *       </ul>
+ * </ol>
+ */
+@Internal
+@ThreadSafe
+public class WindmillTagEncodingV2 extends WindmillTagEncoding {
+
+  private static final WindmillTagEncodingV2 INSTANCE = new 
WindmillTagEncodingV2();
+  private static final int WINDOW_NAMESPACE_BYTE = 0x01;
+  private static final int WINDOW_AND_TRIGGER_NAMESPACE_BYTE = 0x02;
+  private static final int NON_GLOBAL_NAMESPACE_BYTE = 0x10;
+  private static final int GLOBAL_NAMESPACE_BYTE = 0x01;
+  private static final int SYSTEM_STATE_TAG_BYTE = 0x01;
+  private static final int USER_STATE_TAG_BYTE = 0x02;
+  private static final int SYSTEM_TIMER_BYTE = 0x03;
+  private static final int USER_TIMER_BYTE = 0x04;
+  private static final int INTERVAL_WINDOW_BYTE = 0x64;
+  private static final int OTHER_WINDOW_BYTE = 0x02;
+
+  // Private constructor to prevent instantiations from outside.
+  private WindmillTagEncodingV2() {}
+
+  /** {@inheritDoc} */
+  @Override
+  public InternedByteString stateTag(StateNamespace namespace, StateTag<?> 
address) {
+    try (StreamHandle streamHandle = 
ThreadLocalByteStringOutputStream.acquire()) {
+      ByteStringOutputStream stream = streamHandle.stream();
+      encodeNamespace(namespace, stream);
+      encodeAddress(address, stream);
+      return InternedByteString.of(stream.toByteStringAndReset());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ByteString timerHoldTag(
+      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
timerTag) {
+    // Same encoding for timer tag and timer hold tag.
+    // They are put in different places and won't collide.
+    return timerTag;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+    try (StreamHandle streamHandle = 
ThreadLocalByteStringOutputStream.acquire()) {
+      ByteStringOutputStream stream = streamHandle.stream();
+      encodeNamespace(timerData.getNamespace(), stream);
+      if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) {
+        stream.write(SYSTEM_TIMER_BYTE);
+      } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) 
{
+        stream.write(USER_TIMER_BYTE);
+      } else {
+        throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + 
prefix);
+      }
+      StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream);
+      StringUtf8Coder.of().encode(timerData.getTimerId(), stream);
+      return stream.toByteStringAndReset();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public TimerData windmillTimerToTimerData(
+      WindmillNamespacePrefix prefix,
+      Timer timer,
+      Coder<? extends BoundedWindow> windowCoder,
+      boolean draining) {
+
+    InputStream stream = timer.getTag().newInput();
+
+    try {
+      StateNamespace stateNamespace = decodeNamespace(stream, windowCoder);
+      int nextByte = stream.read();
+      if (nextByte == SYSTEM_TIMER_BYTE) {
+        
checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix));
+      } else if (nextByte == USER_TIMER_BYTE) {
+        
checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix));
+      } else {
+        throw new IllegalStateException("Unexpected timer tag byte: " + 
nextByte);
+      }
+
+      String timerFamilyId = StringUtf8Coder.of().decode(stream);
+      String timerId = StringUtf8Coder.of().decode(stream);
+
+      Instant timestamp = 
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
+      Instant outputTimestamp = timestamp;
+      if (timer.hasMetadataTimestamp()) {
+        // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output 
timestamp" so make sure
+        // to change the upper bound.
+        outputTimestamp =
+            
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
+        if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
+          outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
+        }
+      }
+
+      return TimerData.of(
+          timerId,
+          timerFamilyId,
+          stateNamespace,
+          timestamp,
+          outputTimestamp,
+          timerTypeToTimeDomain(timer.getType()));
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    // todo add draining (https://github.com/apache/beam/issues/36884)
+  }
+
+  /** @return the singleton WindmillStateTagUtil */
+  public static WindmillTagEncodingV2 instance() {
+    return INSTANCE;
+  }
+
+  private void encodeAddress(StateTag<?> tag, ByteStringOutputStream stream) 
throws IOException {
+    if (StateTags.isSystemTagInternal(tag)) {
+      stream.write(SYSTEM_STATE_TAG_BYTE);
+    } else {
+      stream.write(USER_STATE_TAG_BYTE);
+    }
+    StringUtf8Coder.of().encode(tag.getId(), stream);
+  }
+
+  private void encodeNamespace(StateNamespace namespace, 
ByteStringOutputStream stream)
+      throws IOException {
+    if (namespace instanceof GlobalNamespace) {
+      stream.write(GLOBAL_NAMESPACE_BYTE);
+    } else if (namespace instanceof WindowNamespace) {
+      stream.write(NON_GLOBAL_NAMESPACE_BYTE);
+      encodeWindowNamespace((WindowNamespace<? extends BoundedWindow>) 
namespace, stream);
+    } else if (namespace instanceof WindowAndTriggerNamespace) {
+      stream.write(NON_GLOBAL_NAMESPACE_BYTE);
+      encodeWindowAndTriggerNamespace(
+          (WindowAndTriggerNamespace<? extends BoundedWindow>) namespace, 
stream);
+    } else {
+      throw new IllegalStateException("Unsupported namespace type: " + 
namespace.getClass());
+    }
+  }
+
+  private StateNamespace decodeNamespace(
+      InputStream stream, Coder<? extends BoundedWindow> windowCoder) throws 
IOException {
+    int firstByte = stream.read();
+    switch (firstByte) {
+      case GLOBAL_NAMESPACE_BYTE:
+        return StateNamespaces.global();
+      case NON_GLOBAL_NAMESPACE_BYTE:
+        return decodeNonGlobalNamespace(stream, windowCoder);
+      default:
+        throw new IllegalStateException("Invalid first namespace byte: " + 
firstByte);
+    }
+  }
+
+  private <W extends BoundedWindow> StateNamespace decodeNonGlobalNamespace(
+      InputStream stream, Coder<W> windowCoder) throws IOException {
+    W window = decodeWindow(stream, windowCoder);
+    int namespaceByte = stream.read();
+    switch (namespaceByte) {
+      case WINDOW_NAMESPACE_BYTE:
+        return StateNamespaces.window(windowCoder, window);
+      case WINDOW_AND_TRIGGER_NAMESPACE_BYTE:
+        Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream);
+        return StateNamespaces.windowAndTrigger(windowCoder, window, 
triggerIndex);
+      default:
+        throw new IllegalStateException("Invalid trigger namespace byte: " + 
namespaceByte);
+    }
+  }
+
+  private <W extends BoundedWindow> W decodeWindow(InputStream stream, 
Coder<W> windowCoder)
+      throws IOException {
+    int firstByte = stream.read();
+    W window;
+    switch (firstByte) {
+      case INTERVAL_WINDOW_BYTE:
+        window = (W) decodeIntervalWindow(stream);
+        break;
+      case OTHER_WINDOW_BYTE:
+        window = windowCoder.decode(stream);
+        break;
+      default:
+        throw new IllegalStateException("Unexpected window first byte: " + 
firstByte);
+    }
+    return window;
+  }
+
+  private IntervalWindow decodeIntervalWindow(InputStream stream) throws 
IOException {
+    Instant end = InstantCoder.of().decode(stream);
+    Instant start = InstantCoder.of().decode(stream);
+    return new IntervalWindow(start, end);
+  }
+
+  private <W extends BoundedWindow> void encodeWindowNamespace(
+      WindowNamespace<W> windowNamespace, ByteStringOutputStream stream) 
throws IOException {
+    encodeWindow(windowNamespace.getWindow(), 
windowNamespace.getWindowCoder(), stream);
+    stream.write(WINDOW_NAMESPACE_BYTE);
+  }
+
+  private <W extends BoundedWindow> void encodeWindowAndTriggerNamespace(
+      WindowAndTriggerNamespace<W> windowAndTriggerNamespace, 
ByteStringOutputStream stream)
+      throws IOException {
+    encodeWindow(
+        windowAndTriggerNamespace.getWindow(), 
windowAndTriggerNamespace.getWindowCoder(), stream);
+    stream.write(WINDOW_AND_TRIGGER_NAMESPACE_BYTE);
+    
BigEndianIntegerCoder.of().encode(windowAndTriggerNamespace.getTriggerIndex(), 
stream);
+  }
+
+  private <W extends BoundedWindow> void encodeWindow(
+      W window, Coder<W> windowCoder, ByteStringOutputStream stream) throws 
IOException {
+    if (windowCoder instanceof IntervalWindowCoder) {
+      stream.write(INTERVAL_WINDOW_BYTE);
+      InstantCoder.of().encode(((IntervalWindow) window).end(), stream);
+      InstantCoder.of().encode(((IntervalWindow) window).start(), stream);
+    } else {
+      stream.write(OTHER_WINDOW_BYTE);
+      windowCoder.encode(window, stream);
+    }
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
index 26979990330..097da87fb01 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
@@ -74,6 +74,14 @@ final class ComputationWorkExecutorFactory {
   private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
       "throw_exceptions_on_large_output";
 
+  // Experiment to enable tag encoding v2.
+  // Experiment is for testing by dataflow runner developers.
+  // Related logic could change anytime without notice.
+  // **DO NOT USE** on real workloads.
+  // Enabling the experiment could lead to state incompatibilities and broken 
jobs.
+  private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT =
+      "unstable_windmill_tag_encoding_v2";
+
   private final DataflowWorkerHarnessOptions options;
   private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
   private final ReaderCache readerCache;
@@ -97,6 +105,7 @@ final class ComputationWorkExecutorFactory {
   private final IdGenerator idGenerator;
   private final StreamingGlobalConfigHandle globalConfigHandle;
   private final boolean throwExceptionOnLargeOutput;
+  private final boolean enableWindmillTagEncodingV2;
 
   ComputationWorkExecutorFactory(
       DataflowWorkerHarnessOptions options,
@@ -124,6 +133,8 @@ final class ComputationWorkExecutorFactory {
             : StreamingDataflowWorker.MAX_SINK_BYTES;
     this.throwExceptionOnLargeOutput =
         hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
+    this.enableWindmillTagEncodingV2 =
+        hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT);
   }
 
   private static Nodes.ParallelInstructionNode extractReadNode(
@@ -268,7 +279,8 @@ final class ComputationWorkExecutorFactory {
         stageInfo.executionStateRegistry(),
         globalConfigHandle,
         maxSinkBytes,
-        throwExceptionOnLargeOutput);
+        throwExceptionOnLargeOutput,
+        enableWindmillTagEncodingV2);
   }
 
   private DataflowMapTaskExecutor createMapTaskExecutor(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 93b279f0aec..8372b33d81c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -133,7 +133,8 @@ public class StreamingModeExecutionContextTest {
             executionStateRegistry,
             globalConfigHandle,
             Long.MAX_VALUE,
-            /*throwExceptionOnLargeOutput=*/ false);
+            /*throwExceptionOnLargeOutput=*/ false,
+            /*enableWindmillTagEncodingV2=*/ false);
   }
 
   private static Work createMockWork(Windmill.WorkItem workItem, Watermarks 
watermarks) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 334b9414b26..f7364104f5d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -618,7 +618,8 @@ public class WorkerCustomSourcesTest {
             executionStateRegistry,
             globalConfigHandle,
             Long.MAX_VALUE,
-            /*throwExceptionOnLargeOutput=*/ false);
+            /*throwExceptionOnLargeOutput=*/ false,
+            /*enableWindmillTagEncodingV2=*/ false);
 
     options.setNumWorkers(5);
     int maxElements = 10;
@@ -989,7 +990,8 @@ public class WorkerCustomSourcesTest {
             executionStateRegistry,
             globalConfigHandle,
             Long.MAX_VALUE,
-            /*throwExceptionOnLargeOutput=*/ false);
+            /*throwExceptionOnLargeOutput=*/ false,
+            /*enableWindmillTagEncodingV2=*/ false);
 
     options.setNumWorkers(5);
     int maxElements = 100;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
new file mode 100644
index 00000000000..af9ef95410d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Enclosed.class)
+public class WindmillTagEncodingV2Test {
+
+  private static final IntervalWindow INTERVAL_WINDOW =
+      new IntervalWindow(new Instant(10), new Instant(20));
+
+  private static final CustomWindow CUSTOM_WINDOW = new 
CustomWindow(INTERVAL_WINDOW);
+
+  private static final int TRIGGER_INDEX = 5;
+
+  private static final StateNamespace GLOBAL_NAMESPACE = new GlobalNamespace();
+
+  private static final StateNamespace INTERVAL_WINDOW_NAMESPACE =
+      StateNamespaces.window(IntervalWindow.getCoder(), INTERVAL_WINDOW);
+  private static final StateNamespace INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE =
+      StateNamespaces.windowAndTrigger(IntervalWindow.getCoder(), 
INTERVAL_WINDOW, TRIGGER_INDEX);
+
+  private static final StateNamespace OTHER_WINDOW_NAMESPACE =
+      StateNamespaces.window(new CustomWindow.CustomWindowCoder(), 
CUSTOM_WINDOW);
+  private static final StateNamespace OTHER_WINDOW_AND_TRIGGER_NAMESPACE =
+      StateNamespaces.windowAndTrigger(
+          new CustomWindow.CustomWindowCoder(), CUSTOM_WINDOW, TRIGGER_INDEX);
+
+  // Generate a tag with length > 256, so length is encoded in two bytes.
+  private static final String TAG =
+      IntStream.of(300).mapToObj(i -> "a").collect(Collectors.joining());
+
+  private static final StateTag<ValueState<Integer>> USER_STATE_TAG =
+      StateTags.value(TAG, VarIntCoder.of());
+  private static final StateTag<ValueState<Integer>> SYSTEM_STATE_TAG =
+      StateTags.makeSystemTagInternal(StateTags.value(TAG, VarIntCoder.of()));
+
+  private static final ByteString TAG_BYTES = encode(StringUtf8Coder.of(), 
TAG);
+
+  private static final ByteString SYSTEM_STATE_TAG_BYTES =
+      ByteString.copyFrom(new byte[] {1}) // system tag
+          .concat(TAG_BYTES);
+  private static final ByteString USER_STATE_TAG_BYTES =
+      ByteString.copyFrom(new byte[] {2}) // user tag
+          .concat(TAG_BYTES);
+
+  private static final ByteString GLOBAL_NAMESPACE_BYTES =
+      ByteString.copyFrom(new byte[] {0x1}); // global namespace
+
+  private static final ByteString INTERVAL_WINDOW_BYTES =
+      ByteString.EMPTY
+          .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.end()))
+          .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.start()));
+
+  private static final ByteString INTERVAL_WINDOW_NAMESPACE_BYTES =
+      ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+          .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window
+          .concat(INTERVAL_WINDOW_BYTES)
+          .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace
+
+  private static final ByteString INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES =
+      ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+          .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window
+          .concat(INTERVAL_WINDOW_BYTES)
+          .concat(ByteString.copyFrom(new byte[] {0x02})) // window and 
trigger namespace
+          .concat(
+              ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); // 
big endian trigger index
+
+  private static final ByteString OTHER_WINDOW_NAMESPACE_BYTES =
+      ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+          .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval 
window
+          .concat(encode(new CustomWindow.CustomWindowCoder(), new 
CustomWindow(INTERVAL_WINDOW)))
+          .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace
+
+  private static final ByteString OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES =
+      ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+          .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval 
window
+          .concat(encode(new CustomWindow.CustomWindowCoder(), new 
CustomWindow(INTERVAL_WINDOW)))
+          .concat(ByteString.copyFrom(new byte[] {0x02})) // window and 
trigger namespace
+          .concat(
+              ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); // 
big endian trigger index
+
+  private static final String TIMER_FAMILY_ID = "timerFamily";
+  private static final ByteString TIMER_FAMILY_ID_BYTES =
+      encode(StringUtf8Coder.of(), TIMER_FAMILY_ID);
+
+  private static final String TIMER_ID = "timerId";
+  private static final ByteString TIMER_ID_BYTES = 
encode(StringUtf8Coder.of(), TIMER_ID);
+
+  private static final ByteString SYSTEM_TIMER_BYTES =
+      ByteString.copyFrom(new byte[] {0x3}) // system timer
+          .concat(TIMER_FAMILY_ID_BYTES)
+          .concat(TIMER_ID_BYTES);
+
+  private static final ByteString USER_TIMER_BYTES =
+      ByteString.copyFrom(new byte[] {0x4}) // user timer
+          .concat(TIMER_FAMILY_ID_BYTES)
+          .concat(TIMER_ID_BYTES);
+
+  private static final ByteString SYSTEM_TIMER_BYTES_NO_FAMILY_ID =
+      ByteString.copyFrom(new byte[] {0x3}) // system timer
+          .concat(encode(StringUtf8Coder.of(), ""))
+          .concat(TIMER_ID_BYTES);
+
+  private static final ByteString USER_TIMER_BYTES_NO_FAMILY_ID =
+      ByteString.copyFrom(new byte[] {0x4}) // user timer
+          .concat(encode(StringUtf8Coder.of(), ""))
+          .concat(TIMER_ID_BYTES);
+
+  @RunWith(Parameterized.class)
+  public static class EncodeStateTagTest {
+
+    @Parameters(name = "{index}: namespace={0} stateTag={1} expectedBytes={2}")
+    public static Collection<Object[]> data() {
+      return ImmutableList.of(
+          new Object[] {
+            GLOBAL_NAMESPACE, USER_STATE_TAG, 
GLOBAL_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+          },
+          new Object[] {
+            GLOBAL_NAMESPACE,
+            SYSTEM_STATE_TAG,
+            GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_STATE_TAG_BYTES)
+          },
+          new Object[] {
+            INTERVAL_WINDOW_NAMESPACE,
+            USER_STATE_TAG,
+            INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+          },
+          new Object[] {
+            INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+            USER_STATE_TAG,
+            
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+          },
+          new Object[] {
+            OTHER_WINDOW_NAMESPACE,
+            USER_STATE_TAG,
+            OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+          },
+          new Object[] {
+            OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+            USER_STATE_TAG,
+            
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+          });
+    }
+
+    @Parameter(0)
+    public StateNamespace namespace;
+
+    @Parameter(1)
+    public StateTag<?> stateTag;
+
+    @Parameter(2)
+    public ByteString expectedBytes;
+
+    @Test
+    public void testStateTag() {
+      assertEquals(
+          expectedBytes,
+          WindmillTagEncodingV2.instance().stateTag(namespace, 
stateTag).byteString());
+    }
+  }
+
+  @RunWith(Parameterized.class)
+  public static class TimerTagTest {
+
+    @Parameters(
+        name =
+            "{index}: namespace={0} prefix={1} expectedBytes={2} 
includeTimerId={3}"
+                + " includeTimerFamilyId={4} timeDomain={4}")
+    public static Collection<Object[]> data() {
+      List<Object[]> data = new ArrayList<>();
+      for (boolean includeTimerFamilyId : ImmutableList.of(true, false)) {
+        ByteString expectedSystemTimerBytes =
+            includeTimerFamilyId ? SYSTEM_TIMER_BYTES : 
SYSTEM_TIMER_BYTES_NO_FAMILY_ID;
+        ByteString expectedUserTimerBytes =
+            includeTimerFamilyId ? USER_TIMER_BYTES : 
USER_TIMER_BYTES_NO_FAMILY_ID;
+        List<Object[]> tests =
+            ImmutableList.of(
+                new Object[] {
+                  GLOBAL_NAMESPACE,
+                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+                },
+                new Object[] {
+                  GLOBAL_NAMESPACE,
+                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+                },
+                new Object[] {
+                  INTERVAL_WINDOW_NAMESPACE,
+                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+                },
+                new Object[] {
+                  INTERVAL_WINDOW_NAMESPACE,
+                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+                },
+                new Object[] {
+                  OTHER_WINDOW_NAMESPACE,
+                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+                },
+                new Object[] {
+                  OTHER_WINDOW_NAMESPACE,
+                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+                },
+                new Object[] {
+                  INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+                },
+                new Object[] {
+                  INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+                },
+                new Object[] {
+                  OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+                },
+                new Object[] {
+                  OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+                });
+
+        for (Object[] params : tests) {
+          for (TimeDomain timeDomain : TimeDomain.values()) {
+            data.add(
+                new Object[] {params[0], params[1], params[2], 
includeTimerFamilyId, timeDomain});
+          }
+        }
+      }
+      return data;
+    }
+
+    @Parameter(0)
+    public StateNamespace namespace;
+
+    @Parameter(1)
+    public WindmillNamespacePrefix prefix;
+
+    @Parameter(2)
+    public ByteString expectedBytes;
+
+    @Parameter(3)
+    public boolean includeTimerFamilyId;
+
+    @Parameter(4)
+    public TimeDomain timeDomain;
+
+    @Test
+    public void testTimerTag() {
+      TimerData timerData =
+          includeTimerFamilyId
+              ? TimerData.of(
+                  TIMER_ID,
+                  TIMER_FAMILY_ID,
+                  namespace,
+                  new Instant(123),
+                  new Instant(456),
+                  timeDomain)
+              : TimerData.of(TIMER_ID, namespace, new Instant(123), new 
Instant(456), timeDomain);
+      assertEquals(expectedBytes, 
WindmillTagEncodingV2.instance().timerTag(prefix, timerData));
+    }
+  }
+
+  @RunWith(Parameterized.class)
+  public static class TimerDataFromTimerTest {
+
+    @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} 
timeDomain={5}")
+    public static Collection<Object[]> data() {
+      List<Object[]> tests =
+          ImmutableList.of(
+              new Object[] {
+                GLOBAL_NAMESPACE,
+                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+                GlobalWindow.Coder.INSTANCE
+              },
+              new Object[] {
+                GLOBAL_NAMESPACE,
+                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+                GlobalWindow.Coder.INSTANCE
+              },
+              new Object[] {
+                INTERVAL_WINDOW_NAMESPACE,
+                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+                IntervalWindow.getCoder()
+              },
+              new Object[] {
+                INTERVAL_WINDOW_NAMESPACE,
+                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+                IntervalWindow.getCoder()
+              },
+              new Object[] {
+                OTHER_WINDOW_NAMESPACE,
+                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+                new CustomWindow.CustomWindowCoder()
+              },
+              new Object[] {
+                OTHER_WINDOW_NAMESPACE,
+                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+                new CustomWindow.CustomWindowCoder()
+              },
+              new Object[] {
+                INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+                IntervalWindow.getCoder()
+              },
+              new Object[] {
+                INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+                IntervalWindow.getCoder()
+              },
+              new Object[] {
+                OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+                new CustomWindow.CustomWindowCoder()
+              },
+              new Object[] {
+                OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+                new CustomWindow.CustomWindowCoder()
+              });
+
+      List<Object[]> data = new ArrayList<>();
+      for (Object[] params : tests) {
+        for (boolean draining : ImmutableList.of(true, false)) {
+          for (TimeDomain timeDomain : TimeDomain.values()) {
+            data.add(
+                new Object[] {params[0], params[1], params[2], params[3], 
draining, timeDomain});
+          }
+        }
+      }
+      return data;
+    }
+
+    @Parameter(0)
+    public StateNamespace namespace;
+
+    @Parameter(1)
+    public WindmillNamespacePrefix prefix;
+
+    @Parameter(2)
+    public ByteString timerTag;
+
+    @Parameter(3)
+    public Coder<? extends BoundedWindow> windowCoder;
+
+    @Parameter(4)
+    public boolean draining;
+
+    @Parameter(5)
+    public TimeDomain timeDomain;
+
+    @Test
+    public void testTimerDataFromTimer() {
+      WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance();
+      Instant timestamp = Instant.now();
+      Instant outputTimestamp = timestamp.plus(Duration.standardSeconds(1));
+      TimerData timerData =
+          TimerData.of(
+              TIMER_ID, TIMER_FAMILY_ID, namespace, timestamp, 
outputTimestamp, timeDomain);
+      Timer timer =
+          Timer.newBuilder()
+              .setTag(timerTag)
+              
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp))
+              
.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp))
+              .setType(timerType(timeDomain))
+              .build();
+      assertEquals(
+          timerData, encoding.windmillTimerToTimerData(prefix, timer, 
windowCoder, draining));
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class TimerHoldTagTest {
+
+    @Test
+    public void testTimerHoldTagUsesTimerTag() {
+      TimerData timerData =
+          TimerData.of(
+              TIMER_ID,
+              TIMER_FAMILY_ID,
+              GLOBAL_NAMESPACE,
+              new Instant(123),
+              new Instant(456),
+              TimeDomain.EVENT_TIME);
+      byte[] bytes = new byte[16];
+      ThreadLocalRandom.current().nextBytes(bytes);
+      ByteString timerTag = ByteString.copyFrom(bytes);
+      assertEquals(
+          WindmillTagEncodingV2.instance()
+              .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, 
timerData, timerTag),
+          timerTag);
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class SortOrderTest {
+
+    @Test
+    public void testSortOrder() {
+      WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance();
+
+      Instant baseInstant = Instant.now();
+      // [5, 20)
+      StateNamespace interval5_20 =
+          StateNamespaces.window(
+              IntervalWindow.getCoder(),
+              new IntervalWindow(
+                  baseInstant.plus(Duration.millis(5)), 
baseInstant.plus(Duration.millis(20))));
+      // [10, 20)
+      StateNamespace interval10_20 =
+          StateNamespaces.window(
+              IntervalWindow.getCoder(),
+              new IntervalWindow(
+                  baseInstant.plus(Duration.millis(10)), 
baseInstant.plus(Duration.millis(20))));
+      // [20, 30)
+      StateNamespace interval20_30 =
+          StateNamespaces.window(
+              IntervalWindow.getCoder(),
+              new IntervalWindow(
+                  baseInstant.plus(Duration.millis(20)), 
baseInstant.plus(Duration.millis(30))));
+
+      ByteString globalBytes = encoding.stateTag(GLOBAL_NAMESPACE, 
USER_STATE_TAG).byteString();
+      ByteString otherWindowBytes =
+          encoding.stateTag(OTHER_WINDOW_NAMESPACE, 
USER_STATE_TAG).byteString();
+      ByteString interval5_20Bytes = encoding.stateTag(interval5_20, 
USER_STATE_TAG).byteString();
+      ByteString interval10_20Bytes = encoding.stateTag(interval10_20, 
USER_STATE_TAG).byteString();
+      ByteString interval20_30Bytes = encoding.stateTag(interval20_30, 
USER_STATE_TAG).byteString();
+
+      // Global < Non-Interval < Interval
+      assertOrdered(globalBytes, otherWindowBytes);
+      assertOrdered(otherWindowBytes, interval5_20Bytes);
+
+      // Interval sorting: EndTime then StartTime
+      // [5, 20) < [10, 20) (Same End=20, Start 5 < 10)
+      assertOrdered(interval5_20Bytes, interval10_20Bytes);
+      // [10, 20) < [20, 30) (End 20 < 30)
+      assertOrdered(interval10_20Bytes, interval20_30Bytes);
+
+      assertTrue(globalBytes.startsWith(ByteString.copyFrom(new byte[] 
{0x01})));
+      assertTrue(otherWindowBytes.startsWith(ByteString.copyFrom(new byte[] 
{0x10, 0x02})));
+      assertTrue(interval5_20Bytes.startsWith(ByteString.copyFrom(new byte[] 
{0x10, 0x64})));
+      assertTrue(interval10_20Bytes.startsWith(ByteString.copyFrom(new byte[] 
{0x10, 0x64})));
+      assertTrue(interval20_30Bytes.startsWith(ByteString.copyFrom(new byte[] 
{0x10, 0x64})));
+    }
+
+    private void assertOrdered(ByteString smaller, ByteString larger) {
+      
assertTrue(ByteString.unsignedLexicographicalComparator().compare(smaller, 
larger) < 0);
+    }
+  }
+
+  private static class CustomWindow extends IntervalWindow {
+
+    private CustomWindow(IntervalWindow intervalWindow) {
+      super(intervalWindow.start(), intervalWindow.end());
+    }
+
+    private static class CustomWindowCoder extends Coder<CustomWindow> {
+
+      @Override
+      public void verifyDeterministic() throws NonDeterministicException {
+        IntervalWindowCoder.of().verifyDeterministic();
+      }
+
+      @Override
+      public List<? extends Coder<?>> getCoderArguments() {
+        return IntervalWindowCoder.of().getCoderArguments();
+      }
+
+      @Override
+      public void encode(CustomWindow value, OutputStream outStream) throws 
IOException {
+        IntervalWindowCoder.of().encode(value, outStream);
+      }
+
+      @Override
+      public CustomWindow decode(InputStream inStream) throws IOException {
+        return new CustomWindow(IntervalWindowCoder.of().decode(inStream));
+      }
+    }
+  }
+
+  private static <T> ByteString encode(Coder<T> coder, T value) {
+    try {
+      ByteString.Output out = ByteString.newOutput();
+      coder.encode(value, out);
+      return out.toByteString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static Timer.Type timerType(TimeDomain domain) {
+    switch (domain) {
+      case EVENT_TIME:
+        return Timer.Type.WATERMARK;
+      case PROCESSING_TIME:
+        return Timer.Type.REALTIME;
+      case SYNCHRONIZED_PROCESSING_TIME:
+        return Timer.Type.DEPENDENT_REALTIME;
+      default:
+        throw new IllegalArgumentException("Unrecognized TimeDomain: " + 
domain);
+    }
+  }
+}

Reply via email to