Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2572
No, it's more or less that but there is more stuff that needs to be in that
adapter, for example in `testOnProcessingTimeFire()`, I'm highlighting the
places that need changing:
```
public void testOnProcessingTimeFire() throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner =
mockTimeWindowAssigner();
when(mockAssigner.isEventTime()).thenReturn(false); <-- here
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer,
WindowedValue<List<Integer>, TimeWindow>> testHarness =
createListWindowOperator(mockAssigner, mockTrigger, 0L);
testHarness.open();
testHarness.setProcessingTime(Long.MIN_VALUE); <-- here
when(mockAssigner.assignWindows(anyInt(), anyLong(),
anyAssignerContext()))
.thenReturn(Arrays.asList(new TimeWindow(2, 4), new
TimeWindow(0, 2)));
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws
Exception {
Trigger.TriggerContext context = (Trigger.TriggerContext)
invocation.getArguments()[3];
context.registerProcessingTimeTimer(0L); <-- here
context.getPartitionedState(valueStateDescriptor).update("hello");
return TriggerResult.CONTINUE;
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(),
anyLong(), anyTimeWindow(), anyTriggerContext());
shouldFireOnProcessingTime(mockTrigger); <-- here
testHarness.processElement(new StreamRecord<>(0, 0L));
assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents
and trigger state for two windows
assertEquals(4, testHarness.numProcessingTimeTimers()); // timers/gc
timers for two windows <-- here
testHarness.setProcessingTime(0L); <-- here
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(),
anyTriggerContext());
// FIRE should not purge contents
assertEquals(4, testHarness.numKeyedStateEntries());
assertEquals(2, testHarness.numProcessingTimeTimers()); // only gc
timers left <-- here
// there should be two elements now
assertThat(testHarness.extractOutputStreamRecords(),
containsInAnyOrder(
isWindowedValue(contains(0), 1L, timeWindow(0, 2)),
isWindowedValue(contains(0), 3L, timeWindow(2, 4))));
}
```
(man I can't make text bold in a code block ... ð)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---