[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15614891#comment-15614891
]
ASF GitHub Bot commented on FLINK-4552:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85498801
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+ * Verify that state of separate windows does not leak into other
windows.
+ */
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new
TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new
StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement
when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+ }
+
+ /**
+ * Verify that clear() does not leak across windows.
+ */
+ @Test
+ public void testClear() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new
TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+ testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+ testHarness.clearTriggerState(new TimeWindow(0, 2));
+
+ assertEquals(0, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2,
4)));
+ }
+
+ @Test
+ public void testMergingWindows() throws Exception {
+ TriggerTestHarness<Object, TimeWindow> testHarness =
+ new
TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new
TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
--- End diff --
Done, theres are also more tests about merging specifically in
`MergingWindowSetTest` and `WindowOperatorTest`. The core of the merging
"algorithm" is exercised in `MergingWindowSetTest`.
> Refactor WindowOperator/Trigger Tests
> -------------------------------------
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
> Issue Type: Improvement
> Components: Windowing Operators
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files,
> possibly one per trigger, for the triggers. Also, we should extend/change the
> tests in some key ways:
> - {{WindowOperatorTest}} test should just verify that the interaction
> between {{WindowOperator}} and the various other parts works as expected,
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at
> the expected time and that snapshotting, timers, cleanup etc. work correctly.
> These tests should also verify that the different state types and
> {{WindowFunctions}} work correctly.
> - {{Trigger}} tests should present elements to triggers and verify that they
> fire at the correct times. The actual output of the {{WindowFunction}} is not
> important for these tests. We should also test that triggers correctly clean
> up state and timers.
> - {{WindowAssigner}} tests should test each window assigner and also verify
> that, for example, the offset parameter of time-based windows works correctly.
> There is already {{WindowingTestHarness}} but it is not used by tests, I
> think we can expand on that and provide more thorough test coverage while
> also making the tests more maintainable ({{WindowOperatorTest.java}} is
> nearing 3000 lines of code).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)