[
https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986432#comment-15986432
]
ASF GitHub Bot commented on FLINK-4953:
---------------------------------------
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/3661
Thanks, @manuzhang I had another look at your changes. I would merge them
now but simplify the tests and `WindowOperator` a little, if that's alright
with you.
In `WindowOperator` I would change `WindowContext` to query the
`InternalTimerService` directly, as in:
```
@Override
public long currentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return internalTimerService.currentWatermark();
}
```
I would introduce a specific test, like this:
```
@Test
public void testEventTimeQuerying() throws Exception {
testCurrentTimeQuerying(new EventTimeAdaptor());
}
@Test
public void testProcessingTimeQuerying() throws Exception {
testCurrentTimeQuerying(new ProcessingTimeAdaptor());
}
public void testCurrentTimeQuerying(final TimeDomainAdaptor
timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner =
mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer,
TimeWindow> mockWindowFunction = mockWindowFunction();
final KeyedOneInputStreamOperatorTestHarness<Integer, Integer,
Void> testHarness =
createWindowOperator(mockAssigner, mockTrigger,
20L, mockWindowFunction);
testHarness.open();
shouldFireOnElement(mockTrigger);
when(mockAssigner.assignWindows(anyInt(), anyLong(),
anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(0,
20)));
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
InternalWindowFunction.InternalWindowContext
context =
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
timeAdaptor.verifyCorrectTime(testHarness,
context);
return null;
}
}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(),
anyInternalWindowContext(), anyIntIterable(),
WindowOperatorContractTest.<Void>anyCollector());
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
InternalWindowFunction.InternalWindowContext
context =
(InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
timeAdaptor.verifyCorrectTime(testHarness,
context);
return null;
}
}).when(mockWindowFunction).clear(anyTimeWindow(),
anyInternalWindowContext());
timeAdaptor.advanceTime(testHarness, 10);
testHarness.processElement(new StreamRecord<>(0, 0L));
verify(mockWindowFunction, times(1)).process(anyInt(),
anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(),
WindowOperatorContractTest.<Void>anyCollector());
timeAdaptor.advanceTime(testHarness, 100);
verify(mockWindowFunction, times(1)).clear(anyTimeWindow(),
anyInternalWindowContext());
}
```
What do you think? I would change your commit and commit as one thing.
> Allow access to "time" in ProcessWindowFunction.Context
> -------------------------------------------------------
>
> Key: FLINK-4953
> URL: https://issues.apache.org/jira/browse/FLINK-4953
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Reporter: Manu Zhang
> Assignee: Manu Zhang
>
> The recently added {{ProcessWindowFunction}} has a {{Context}} object that
> allows querying some additional information about the window firing that we
> are processing. Right now, this is only the window for which the firing is
> happening. We should extends this with methods that allow querying the
> current processing time and the current watermark.
> Original text by issue creator: This is similar to FLINK-3674 but exposing
> time information in window functions. Currently when a timer is fired, all
> states in a window will be returned to users, including those after the
> timer. This change will allow users to filter out states after the timer
> based on time info.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)