Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2572
  
    No, it's more or less that but there is more stuff that needs to be in that 
adapter, for example in `testOnProcessingTimeFire()`, I'm highlighting the 
places that need changing:
    
    ```
    public void testOnProcessingTimeFire() throws Exception {
    
        WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
        when(mockAssigner.isEventTime()).thenReturn(false); <-- here
        Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
    
        KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
WindowedValue<List<Integer>, TimeWindow>> testHarness =
                createListWindowOperator(mockAssigner, mockTrigger, 0L);
    
        testHarness.open();
    
        testHarness.setProcessingTime(Long.MIN_VALUE); <-- here
    
        when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
                .thenReturn(Arrays.asList(new TimeWindow(2, 4), new 
TimeWindow(0, 2)));
    
        assertEquals(0, testHarness.getOutput().size());
        assertEquals(0, testHarness.numKeyedStateEntries());
    
        doAnswer(new Answer<TriggerResult>() {
            @Override
            public TriggerResult answer(InvocationOnMock invocation) throws 
Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext) 
invocation.getArguments()[3];
                context.registerProcessingTimeTimer(0L); <-- here
                
context.getPartitionedState(valueStateDescriptor).update("hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
    
        shouldFireOnProcessingTime(mockTrigger); <-- here
    
        testHarness.processElement(new StreamRecord<>(0, 0L));
    
        assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents 
and trigger state for two windows
        assertEquals(4, testHarness.numProcessingTimeTimers()); // timers/gc 
timers for two windows <-- here
    
        testHarness.setProcessingTime(0L); <-- here
    
        // clear is only called at cleanup time/GC time
        verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
    
        // FIRE should not purge contents
        assertEquals(4, testHarness.numKeyedStateEntries());
        assertEquals(2, testHarness.numProcessingTimeTimers()); // only gc 
timers left <-- here
    
        // there should be two elements now
        assertThat(testHarness.extractOutputStreamRecords(),
                containsInAnyOrder(
                        isWindowedValue(contains(0), 1L, timeWindow(0, 2)),
                        isWindowedValue(contains(0), 3L, timeWindow(2, 4))));
    }
    ```
    
    (man I can't make text bold in a code block ... 😭) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to