[ https://issues.apache.org/jira/browse/FLINK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072695#comment-16072695 ]
ASF GitHub Bot commented on FLINK-6649: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4157#discussion_r125316505 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala --- @@ -80,18 +80,79 @@ class NonWindowHarnessTest extends HarnessTestBase { val expectedOutput = new ConcurrentLinkedQueue[Object]() - expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), true), 1)) - expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), true), 1)) - - verify(expectedOutput, result, new RowResultSortComparator(6)) + expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, "aaa", 1: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, "bbb", 1: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, "aaa", 3: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, "aaa", 6: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, "aaa", 10: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, "bbb", 3: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, "aaa", 5: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, "aaa", 11: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, "aaa", 18: JInt), true), 1)) + expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, "bbb", 3: JInt), true), 1)) + + verifySorted(expectedOutput, result, new RowResultSortComparator) + + testHarness.close() + } + + @Test + def testProcTimeNonWindowWithUpdateInterval(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new GroupAggProcessFunctionWithUpdateInterval( + genSumAggFunction, + sumAggregationStateType, + sumAggregationRowType, + false, + queryConfig + .withIdleStateRetentionTime(Time.seconds(4), Time.seconds(5)) + .withUnboundedAggregateUpdateInterval(Time.seconds(1)))) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](2), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + testHarness.setProcessingTime(1) + + testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt, "bbb"), true), 1)) + testHarness.setProcessingTime(1000) + testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt, "aaa"), true), 1)) + + testHarness.setProcessingTime(1002) + testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt, "bbb"), true), 1)) + + testHarness.setProcessingTime(4003) + testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt, "aaa"), true), 1)) + + // clear all states + testHarness.setProcessingTime(10003) + testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: JInt, "aaa"), true), 1)) + testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt, "bbb"), true), 1)) + + testHarness.setProcessingTime(12003) --- End diff -- Add a case where a record is first added and later retracted (`CRow` flag is `false`) to verify that the operator sends out a delete message. Can be done on a new key (`"ccc"`). > Improve Non-window group aggregate with configurable `earlyFire`. > ----------------------------------------------------------------- > > Key: FLINK-6649 > URL: https://issues.apache.org/jira/browse/FLINK-6649 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: sunjincheng > Assignee: sunjincheng > > Currently, Non-windowed group aggregate is earlyFiring at count(1), that is > every row will emit a aggregate result. But some times user want config count > number (`early firing with count[N]`) , to reduce the downstream pressure. > This JIRA. will enable the config of e`earlyFiring` for Non-windowed group > aggregate. -- This message was sent by Atlassian JIRA (v6.4.14#64029)