reuvenlax commented on a change in pull request #11350:
URL: https://github.com/apache/beam/pull/11350#discussion_r420423316
##########
File path:
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -857,6 +863,223 @@ public BundleFinalizer bundleFinalizer() {
}
}
+ /**
+ * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used
for running a {@link
+ * DoFn} on window expiration.
+ */
+ private class OnWindowExpirationArgumentProvider<KeyT>
+ extends DoFn<InputT, OutputT>.OnWindowExpirationContext
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+ private final BoundedWindow window;
+ private final Instant timestamp;
+ private final KeyT key;
+ /** Lazily initialized; should only be accessed via {@link
#getNamespace()}. */
+ private @Nullable StateNamespace namespace;
+
+ /**
+ * The state namespace for this context.
+ *
+ * <p>Any call to this method when more than one window is present will
crash; this represents a
+ * bug in the runner or the {@link DoFnSignature}, since values must be in
exactly one window
+ * when state or timers are relevant.
+ */
+ private StateNamespace getNamespace() {
+ if (namespace == null) {
+ namespace = StateNamespaces.window(windowCoder, window);
+ }
+ return namespace;
+ }
+
+ private OnWindowExpirationArgumentProvider(BoundedWindow window, Instant
timestamp, KeyT key) {
+ fn.super();
+ this.window = window;
+ this.timestamp = timestamp;
+ this.key = key;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access paneInfo outside of @ProcessElement methods.");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return getPipelineOptions();
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.StartBundleContext
startBundleContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("StartBundleContext parameters
are not supported.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("FinishBundleContext parameters
are not supported.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT,
OutputT> doFn) {
+ throw new UnsupportedOperationException("ProcessContext parameters are
not supported.");
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("Element parameters are not
supported.");
+ }
+
+ @Override
+ public Object sideInput(String tagId) {
+ throw new UnsupportedOperationException("SideInput parameters are not
supported.");
+ }
+
+ @Override
+ public Object schemaElement(int index) {
+ throw new UnsupportedOperationException("Element parameters are not
supported.");
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return timestamp;
+ }
+
+ @Override
+ public String timerId(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("Timer parameters are not
supported.");
+ }
+
+ @Override
+ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Cannot access time domain outside of @ProcessTimer method.");
+ }
+
+ @Override
+ public KeyT key() {
+ return key;
+ }
+
+ @Override
+ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
+ return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
+ }
+
+ @Override
+ public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
+ return DoFnOutputReceivers.rowReceiver(this, mainOutputTag,
mainOutputSchemaCoder);
+ }
+
+ @Override
+ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT>
doFn) {
+ return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
+ }
+
+ @Override
+ public Object restriction() {
+ throw new UnsupportedOperationException("@Restriction parameters are not
supported.");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT,
OutputT> doFn) {
+ throw new UnsupportedOperationException("OnTimerContext parameters are
not supported.");
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ throw new UnsupportedOperationException("RestrictionTracker parameters
are not supported.");
+ }
+
+ @Override
+ public Object watermarkEstimatorState() {
+ throw new UnsupportedOperationException(
+ "@WatermarkEstimatorState parameters are not supported.");
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ throw new UnsupportedOperationException("WatermarkEstimator parameters
are not supported.");
+ }
+
+ @Override
+ public State state(String stateId, boolean alwaysFetched) {
+ try {
+ StateSpec<?> spec =
+ (StateSpec<?>)
signature.stateDeclarations().get(stateId).field().get(fn);
+ State state =
+ stepContext
+ .stateInternals()
+ .state(getNamespace(), StateTags.tagForSpec(stateId,
(StateSpec) spec));
+ if (alwaysFetched) {
+ return (State) ((ReadableState) state).readLater();
+ } else {
+ return state;
+ }
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ try {
+ TimerSpec spec = (TimerSpec)
signature.timerDeclarations().get(timerId).field().get(fn);
Review comment:
Wondering if we need to expose timers here, as I don't see how it makes
sense to create a new timer in an expired window.
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
##########
@@ -387,7 +387,8 @@ private void processSystemTimer(TimerData timer) throws
Exception {
timer);
BoundedWindow window = ((WindowNamespace)
timer.getNamespace()).getWindow();
- Instant targetTime = earliestAllowableCleanupTime(window,
fnInfo.getWindowingStrategy());
+ Instant targetTime =
+ earliestAllowableCleanupTime(window,
fnInfo.getWindowingStrategy()).minus(1L);
Review comment:
why did you subtract 1 here?
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
##########
@@ -246,8 +246,8 @@ public static boolean isUserTimer(Windmill.Timer timer) {
builder.setStateFamily(stateFamily);
}
-
builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
-
+ builder.setTimestamp(
+
WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
Review comment:
I think this change is incorrect. Why was it necessary?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]