[ 
https://issues.apache.org/jira/browse/FLINK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012393#comment-16012393
 ] 

ASF GitHub Bot commented on FLINK-6583:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3919#discussion_r116741416
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
 ---
    @@ -0,0 +1,305 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.harness
    +
    +import com.google.common.collect.Lists
    +import org.apache.flink.api.common.time.Time
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import 
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class CountTriggerWithCleanupStateHarnessTest {
    +  protected var queryConfig =
    +    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
    +
    +  @Test
    +  def testFiringAndFireingWithPurging(): Unit = {
    +    val testHarness = new TriggerTestHarness[Any, TimeWindow](
    +      CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new 
TimeWindow.Serializer)
    +
    +    // try to trigger onProcessingTime method via 1, but there is non 
timer is triggered
    +    assertEquals(0, testHarness.advanceProcessingTime(1).size())
    +
    +    // register cleanup timer with 3001
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // try to trigger onProcessingTime method via 1000, but there is non 
timer is triggered
    +    assertEquals(0, testHarness.advanceProcessingTime(1000).size())
    +
    +    // 1000 + 2000 <= 3001 reuse timer 3001
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // there are two state entries, one is timer(3001) another is 
counter(2)
    +    assertEquals(2, testHarness.numStateEntries)
    +
    +    // try to trigger onProcessingTime method via 3001, and timer(3001) is 
triggered
    +    assertEquals(
    +      TriggerResult.FIRE_AND_PURGE,
    +      testHarness.advanceProcessingTime(3001).iterator().next().f1)
    +
    +    assertEquals(0, testHarness.numStateEntries)
    +
    +    // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove 
timer 3001
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // try to trigger onProcessingTime method via 4002, but there is non 
timer is triggered
    +    assertEquals(0, testHarness.advanceProcessingTime(4002).size())
    +
    +    // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove 
timer 6001
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // 4002 + 2000 <= 7002 reuse timer 7002
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // have one timer 7002
    +    assertEquals(1, testHarness.numProcessingTimeTimers)
    +    assertEquals(0, testHarness.numEventTimeTimers)
    +    assertEquals(2, testHarness.numStateEntries)
    +    assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9)))
    +    assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18)))
    +
    +    // 4002 + 2000 <= 7002 reuse timer 7002
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // register cleanup timer via 7002 for window (9, 18)
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +
    +    // there are four state entries
    +    assertEquals(4, testHarness.numStateEntries)
    +    assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9)))
    +    assertEquals(2, testHarness.numStateEntries(new TimeWindow(9, 18)))
    +
    +    // the window counter triggered, count >= 10
    +    assertEquals(
    +      TriggerResult.FIRE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(9, 18)))
    +
    +    // counter of window(9, 18) is cleared
    +    assertEquals(3, testHarness.numStateEntries)
    +
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +    assertEquals(
    +      TriggerResult.FIRE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 9)))
    +
    +    // counter of window(0, 9) is cleared
    +    assertEquals(2, testHarness.numStateEntries)
    +    assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 9)))
    +    assertEquals(1, testHarness.numStateEntries(new TimeWindow(9, 18)))
    +
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(18, 27)))
    +
    +    assertEquals(4, testHarness.numStateEntries)
    +
    +    // try to trigger onProcessingTime method via 7002, and all states are 
cleared
    +    assertEquals(
    +      TriggerResult.FIRE_AND_PURGE,
    +      testHarness.advanceProcessingTime(7002).iterator().next().f1)
    +
    +    assertEquals(0, testHarness.numStateEntries)
    +  }
    +
    +  /**
    +    * Verify that clear() does not leak across windows.
    +    */
    +  @Test
    +  def testClear() {
    +    val testHarness = new TriggerTestHarness[Any, TimeWindow](
    +      CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3),
    +      new TimeWindow.Serializer)
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(0, 2)))
    +    assertEquals(
    +      TriggerResult.CONTINUE,
    +      testHarness.processElement(new StreamRecord[Any](1), new 
TimeWindow(2, 4)))
    +    // have 2 timers
    +    assertEquals(2, testHarness.numProcessingTimeTimers)
    +    assertEquals(0, testHarness.numEventTimeTimers)
    +    assertEquals(4, testHarness.numStateEntries)
    +    assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2)))
    +    assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4)))
    +    testHarness.clearTriggerState(new TimeWindow(2, 4))
    +    assertEquals(2, testHarness.numStateEntries)
    +    assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2)))
    +    assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)))
    +    testHarness.clearTriggerState(new TimeWindow(0, 2))
    +    assertEquals(0, testHarness.numStateEntries)
    +    assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2)))
    +    assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4)))
    +  }
    +
    +  @Test
    +  def testMergingWindows() {
    --- End diff --
    
    If the trigger does not support merging we can drop the next two tests.


> Enable QueryConfig in count base GroupWindow
> --------------------------------------------
>
>                 Key: FLINK-6583
>                 URL: https://issues.apache.org/jira/browse/FLINK-6583
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 1.3.0, 1.4.0
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> Enable QueryConfig in count base GroupWindow by Add a custom Trigger 
> `CountTriggerWithCleanupState`. See more in FLINK-6491.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to