[ https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612219#comment-15612219 ]
ASF GitHub Bot commented on FLINK-4552: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85361208 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility for testing {@link Trigger} behaviour. + */ +public class TriggerTestHarness<T, W extends Window> { + + private static final Integer KEY = 1; + + private final Trigger<T, W> trigger; + private final TypeSerializer<W> windowSerializer; + + private final HeapKeyedStateBackend<Integer> stateBackend; + private final TestInternalTimerService<Integer, W> internalTimerService; + + public TriggerTestHarness( + Trigger<T, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + this.trigger = trigger; + this.windowSerializer = windowSerializer; + + // we only ever use one key, other tests make sure that windows work across different + // keys + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + MemoryStateBackend backend = new MemoryStateBackend(); + + @SuppressWarnings("unchecked") + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + this.stateBackend = stateBackend; + + this.stateBackend.setCurrentKey(0); + + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { + @Override + public void setCurrentKey(Object key) { + // ignore + } + + @Override + public Object getCurrentKey() { + return KEY; + } + }); + } + + public int numProcessingTimeTimers() { + return internalTimerService.numProcessingTimeTimers(); + } + + public int numProcessingTimeTimers(W window) { + return internalTimerService.numProcessingTimeTimers(window); + } + + public int numEventTimeTimers() { + return internalTimerService.numEventTimeTimers(); + } + + public int numEventTimeTimers(W window) { + return internalTimerService.numEventTimeTimers(window); + } + + public int numStateEntries() { + return stateBackend.numStateEntries(); + } + + public int numStateEntries(W window) { + return stateBackend.numStateEntries(window); + } + + /** + * Injects one element into the trigger for the given window and returns the result of + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + */ + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + } + + /** + * Advanced processing time and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); --- End diff -- I don't think this checks EXACTLY one, but only AT LEAST one. > Refactor WindowOperator/Trigger Tests > ------------------------------------- > > Key: FLINK-4552 > URL: https://issues.apache.org/jira/browse/FLINK-4552 > Project: Flink > Issue Type: Improvement > Components: Windowing Operators > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and > {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these > test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and > {{WindowFunction}} produce the expected output. > We should modularize these tests and spread them out across multiple files, > possibly one per trigger, for the triggers. Also, we should extend/change the > tests in some key ways: > - {{WindowOperatorTest}} test should just verify that the interaction > between {{WindowOperator}} and the various other parts works as expected, > that the correct methods on {{Trigger}} and {{WindowFunction}} are called at > the expected time and that snapshotting, timers, cleanup etc. work correctly. > These tests should also verify that the different state types and > {{WindowFunctions}} work correctly. > - {{Trigger}} tests should present elements to triggers and verify that they > fire at the correct times. The actual output of the {{WindowFunction}} is not > important for these tests. We should also test that triggers correctly clean > up state and timers. > - {{WindowAssigner}} tests should test each window assigner and also verify > that, for example, the offset parameter of time-based windows works correctly. > There is already {{WindowingTestHarness}} but it is not used by tests, I > think we can expand on that and provide more thorough test coverage while > also making the tests more maintainable ({{WindowOperatorTest.java}} is > nearing 3000 lines of code). -- This message was sent by Atlassian JIRA (v6.3.4#6332)