This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a3cb642db76 Merge pull request #38878: Fix OnWindowExpirationContext.
a3cb642db76 is described below
commit a3cb642db76ffd2daf7dc5f2e2d15c0d8fa953d5
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Jun 9 15:19:23 2026 -0700
Merge pull request #38878: Fix OnWindowExpirationContext.
* foo
* fix compilation
* fix compilation
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 20 ++++++++++++++++++++
.../reflect/ByteBuddyDoFnInvokerFactory.java | 12 ++++++++++++
.../beam/sdk/transforms/reflect/DoFnInvoker.java | 17 +++++++++++++++++
.../beam/sdk/transforms/reflect/DoFnSignature.java | 9 +++++++++
.../beam/sdk/transforms/reflect/DoFnSignatures.java | 3 ++-
.../construction/SplittableParDoNaiveBounded.java | 6 ++++++
.../org/apache/beam/sdk/transforms/ParDoTest.java | 4 ++++
.../sdk/transforms/reflect/DoFnSignaturesTest.java | 8 ++++++--
.../org/apache/beam/fn/harness/FnApiDoFnRunner.java | 13 +++++++++++++
9 files changed, 89 insertions(+), 3 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 470e22a6699..1825b77b65f 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -649,6 +649,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
"Cannot access OnTimerContext outside of @OnTimer methods.");
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access OnWindowExpirationContext outside of
@OnWindowExpiration methods.");
+ }
+
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
throw new UnsupportedOperationException("RestrictionTracker parameters
are not supported.");
@@ -958,6 +965,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
return this;
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access OnWindowExpirationContext outside of
@OnWindowExpiration methods.");
+ }
+
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
throw new UnsupportedOperationException("RestrictionTracker parameters
are not supported.");
@@ -1299,6 +1313,12 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
throw new UnsupportedOperationException("OnTimerContext parameters are
not supported.");
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
throw new UnsupportedOperationException("RestrictionTracker parameters
are not supported.");
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 3ebabb6e3c3..c08243fda5c 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -142,6 +142,8 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
public static final String OUTPUT_PARAMETER_METHOD = "outputReceiver";
public static final String TAGGED_OUTPUT_PARAMETER_METHOD =
"taggedOutputReceiver";
public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD =
"onTimerContext";
+ public static final String ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER_METHOD =
+ "onWindowExpirationContext";
public static final String WINDOW_PARAMETER_METHOD = "window";
public static final String PANE_INFO_PARAMETER_METHOD = "paneInfo";
public static final String PIPELINE_OPTIONS_PARAMETER_METHOD =
"pipelineOptions";
@@ -1170,6 +1172,16 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
ON_TIMER_CONTEXT_PARAMETER_METHOD, DoFn.class)));
}
+ @Override
+ public StackManipulation dispatch(
+ DoFnSignature.Parameter.OnWindowExpirationContextParameter p) {
+ return new StackManipulation.Compound(
+ pushDelegate,
+ MethodInvocation.invoke(
+ getExtraContextFactoryMethodDescription(
+ ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER_METHOD,
DoFn.class)));
+ }
+
@Override
public StackManipulation dispatch(WindowParameter p) {
return new StackManipulation.Compound(
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index eaabdff907c..c8c7ddf24b6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -185,6 +185,10 @@ public interface DoFnInvoker<InputT, OutputT> {
/** Provide a {@link DoFn.OnTimerContext} to use with the given {@link
DoFn}. */
DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
doFn);
+ /** Provide a {@link DoFn.OnWindowExpirationContext} to use with the given
{@link DoFn}. */
+ DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn);
+
/** Provide a reference to the input element. */
InputT element(DoFn<InputT, OutputT> doFn);
@@ -447,6 +451,13 @@ public interface DoFnInvoker<InputT, OutputT> {
String.format("OnTimerContext unsupported in %s",
getErrorContext()));
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ String.format("OnWindowExpirationContext unsupported in %s",
getErrorContext()));
+ }
+
@Override
public State state(String stateId, boolean alwaysFetched) {
throw new UnsupportedOperationException(
@@ -538,6 +549,12 @@ public interface DoFnInvoker<InputT, OutputT> {
return delegate.onTimerContext(doFn);
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ return delegate.onWindowExpirationContext(doFn);
+ }
+
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return delegate.element(doFn);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 51dadd178a6..99b002c1106 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -305,6 +305,8 @@ public abstract class DoFnSignature {
return cases.dispatch((ProcessContextParameter) this);
} else if (this instanceof OnTimerContextParameter) {
return cases.dispatch((OnTimerContextParameter) this);
+ } else if (this instanceof OnWindowExpirationContextParameter) {
+ return cases.dispatch((OnWindowExpirationContextParameter) this);
} else if (this instanceof WindowParameter) {
return cases.dispatch((WindowParameter) this);
} else if (this instanceof PaneInfoParameter) {
@@ -391,6 +393,8 @@ public abstract class DoFnSignature {
ResultT dispatch(OnTimerContextParameter p);
+ ResultT dispatch(OnWindowExpirationContextParameter p);
+
ResultT dispatch(WindowParameter p);
ResultT dispatch(PaneInfoParameter p);
@@ -498,6 +502,11 @@ public abstract class DoFnSignature {
return dispatchDefault(p);
}
+ @Override
+ public ResultT dispatch(OnWindowExpirationContextParameter p) {
+ return dispatchDefault(p);
+ }
+
@Override
public ResultT dispatch(WindowParameter p) {
return dispatchDefault(p);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 2983fc94021..9f3491bca7b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -229,7 +229,8 @@ public class DoFnSignatures {
Parameter.StateParameter.class,
Parameter.TimestampParameter.class,
Parameter.KeyParameter.class,
- Parameter.SideInputParameter.class);
+ Parameter.SideInputParameter.class,
+ Parameter.OnWindowExpirationContextParameter.class);
private static final Collection<Class<? extends Parameter>>
ALLOWED_GET_INITIAL_RESTRICTION_PARAMETERS =
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
index d1fb23e77c4..52b1174e3a0 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
@@ -506,6 +506,12 @@ public class SplittableParDoNaiveBounded {
throw new IllegalStateException();
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException();
+ }
+
@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return element;
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 0c984d01c8f..6beea338689 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -7335,11 +7335,15 @@ public class ParDoTest implements Serializable {
@OnWindowExpiration
public void onWindowExpiration(
@AlwaysFetched @StateId(stateId) ValueState<Integer> state,
+ BoundedWindow window,
@Key String key,
+ OnWindowExpirationContext context,
OutputReceiver<Integer> r) {
Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
// verify state
assertEquals(1, (int) currentValue);
+ Preconditions.checkNotNull(context);
+ assertEquals(window, context.window());
// To check output is received from OnWindowExpiration
r.output(currentValue);
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 5a5353482c9..330bb5b9441 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -1422,16 +1422,20 @@ public class DoFnSignaturesTest {
@StateId("foo") ValueState<Integer> s,
PipelineOptions p,
OutputReceiver<String> o,
- MultiOutputReceiver m) {}
+ MultiOutputReceiver m,
+ OnWindowExpirationContext c) {}
}.getClass());
List<Parameter> params = sig.onWindowExpiration().extraParameters();
- assertThat(params.size(), equalTo(5));
+ assertThat(params.size(), equalTo(6));
assertThat(params.get(0), instanceOf(WindowParameter.class));
assertThat(params.get(1), instanceOf(StateParameter.class));
assertThat(params.get(2), instanceOf(PipelineOptionsParameter.class));
assertThat(params.get(3), instanceOf(OutputReceiverParameter.class));
assertThat(params.get(4), instanceOf(TaggedOutputReceiverParameter.class));
+ assertThat(
+ params.get(5),
+
instanceOf(DoFnSignature.Parameter.OnWindowExpirationContextParameter.class));
}
private interface FeatureTest {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 3e4675ab074..d5b5cebadb3 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2250,6 +2250,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
"Cannot access OnTimerContext outside of @OnTimer methods.");
}
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access OnWindowExpirationContext outside of
@OnWindowExpiration methods.");
+ }
+
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return currentTracker;
@@ -2469,6 +2476,12 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
private final OnWindowExpirationContext.Context context =
new OnWindowExpirationContext.Context();
+ @Override
+ public DoFn<InputT, OutputT>.OnWindowExpirationContext
onWindowExpirationContext(
+ DoFn<InputT, OutputT> doFn) {
+ return context;
+ }
+
@Override
public BoundedWindow window() {
return currentWindow;