http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala index 8cad64f..ba044be 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala @@ -17,14 +17,15 @@ */ package org.apache.flink.table.runtime.harness -import java.lang.{Integer => JInt, Long => JLong} +import java.lang.{Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.operators.KeyedProcessOperator import org.apache.flink.streaming.runtime.streamrecord.StreamRecord -import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.api.{StreamQueryConfig, Types} import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.harness.HarnessTestBase._ import org.apache.flink.table.runtime.types.CRow @@ -48,8 +49,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ queryConfig)) val testHarness = - createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo - .INT_TYPE_INFO) + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](1), + Types.STRING) testHarness.open() @@ -57,91 +60,77 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 1L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 1)) + CRow(Row.of(toTS(1), "bbb", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 2L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 3L: JLong), true))) // register cleanup timer with 4100 testHarness.setProcessingTime(1100) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1)) + CRow(Row.of(toTS(1), "bbb", 20L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 4L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1)) + CRow(Row.of(toTS(1), "bbb", 30L: JLong), true))) // register cleanup timer with 6001 testHarness.setProcessingTime(3001) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 7L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 8L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 9L: JLong), true))) // trigger cleanup timer and register cleanup timer with 9002 testHarness.setProcessingTime(6002) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2)) + CRow(Row.of(toTS(2), "bbb", 40L: JLong), true))) val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 1)) + CRow(Row.of(toTS(1), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 1)) + CRow(Row.of(toTS(1), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 1)) + CRow(Row.of(toTS(1), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1)) + CRow(Row.of(toTS(1), "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 2)) + CRow(Row.of(toTS(2), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -163,59 +152,59 @@ class OverWindowHarnessTest extends HarnessTestBase{ val testHarness = createHarnessTester( processFunction, - new TupleRowKeySelector[Integer](0), - BasicTypeInfo.INT_TYPE_INFO) + new TupleRowKeySelector[String](1), + Types.STRING) testHarness.open() // register cleanup timer with 3003 testHarness.setProcessingTime(3) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 1L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 10L: JLong), true))) testHarness.setProcessingTime(4) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 2L: JLong), true))) // trigger cleanup timer and register cleanup timer with 6003 testHarness.setProcessingTime(3003) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 3L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 20L: JLong), true))) testHarness.setProcessingTime(5) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 4L: JLong), true))) // register cleanup timer with 9002 testHarness.setProcessingTime(6002) testHarness.setProcessingTime(7002) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 30L: JLong), true))) // register cleanup timer with 14002 testHarness.setProcessingTime(11002) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 7L: JLong), true))) testHarness.setProcessingTime(11004) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 8L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 9L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 40L: JLong), true))) testHarness.setProcessingTime(11006) @@ -225,49 +214,35 @@ class OverWindowHarnessTest extends HarnessTestBase{ // all elements at the same proc timestamp have the same value per key expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4)) + CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4)) + CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 5)) + CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true), 3004)) + CRow(Row.of(toTS(0), "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow(Row.of( - 2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true), 3004)) + CRow(Row.of(toTS(0), "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true), 6)) + CRow(Row.of(toTS(0), "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true), 7003)) + CRow(Row.of(toTS(0), "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 7003)) + CRow(Row.of(toTS(0), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true), 7003)) + CRow(Row.of(toTS(0), "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true), 11003)) + CRow(Row.of(toTS(0), "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow(Row.of( - 1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true), 11005)) + CRow(Row.of(toTS(0), "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow(Row.of( - 1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true), 11005)) + CRow(Row.of(toTS(0), "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true), 11005)) + CRow(Row.of(toTS(0), "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 11005)) + CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -284,8 +259,8 @@ class OverWindowHarnessTest extends HarnessTestBase{ val testHarness = createHarnessTester( processFunction, - new TupleRowKeySelector[Integer](0), - BasicTypeInfo.INT_TYPE_INFO) + new TupleRowKeySelector[String](1), + Types.STRING) testHarness.open() @@ -293,85 +268,71 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1003) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 1L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 2L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 3L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 20L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 4L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 30L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 7L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 8L: JLong), true))) // trigger cleanup timer and register cleanup timer with 8003 testHarness.setProcessingTime(5003) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 5003)) + CRow(Row.of(toTS(0), "aaa", 9L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 5003)) + CRow(Row.of(toTS(0), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 5003)) + CRow(Row.of(toTS(0), "bbb", 40L: JLong), true))) val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0)) + CRow(Row.of(toTS(0), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 0)) + CRow(Row.of(toTS(0), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true), 5003)) + CRow(Row.of(toTS(0), "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 5003)) + CRow(Row.of(toTS(0), "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 5003)) + CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -387,63 +348,64 @@ class OverWindowHarnessTest extends HarnessTestBase{ minMaxAggregationStateType, minMaxCRowType, 4000, + 0, new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( processFunction, - new TupleRowKeySelector[String](3), + new TupleRowKeySelector[String](1), BasicTypeInfo.STRING_TYPE_INFO) testHarness.open() testHarness.processWatermark(1) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 1L: JLong), true))) testHarness.processWatermark(2) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 3)) + CRow(Row.of(toTS(3), "bbb", 10L: JLong), true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) testHarness.processWatermark(4001) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4002)) + CRow(Row.of(toTS(4002), "aaa", 3L: JLong), true))) testHarness.processWatermark(4002) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong), true), 4003)) + CRow(Row.of(toTS(4003), "aaa", 4L: JLong), true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "bbb", 25L: JLong), true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) testHarness.processWatermark(19000) @@ -453,21 +415,22 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is removed after max retention time testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 3000 + CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 4500 + CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500 testHarness.processWatermark(20010) // compute output assert(testHarness.numKeyedStateEntries() > 0) // check that we have state testHarness.setProcessingTime(4499) assert(testHarness.numKeyedStateEntries() > 0) // check that we have state testHarness.setProcessingTime(4500) + val x = testHarness.numKeyedStateEntries() assert(testHarness.numKeyedStateEntries() == 0) // check that all state is gone // check that state is only removed if all data was processed testHarness.processElement(new StreamRecord( - CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 20011)) // clean-up 6500 + CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500 assert(testHarness.numKeyedStateEntries() > 0) // check that we have state testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500 @@ -487,59 +450,42 @@ class OverWindowHarnessTest extends HarnessTestBase{ // all elements at the same row-time have the same value per key expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 2)) + CRow(Row.of(toTS(2), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 3)) + CRow(Row.of(toTS(3), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4002)) + CRow(Row.of(toTS(4002), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4003)) + CRow(Row.of(toTS(4003), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true), 20011)) + CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -552,59 +498,60 @@ class OverWindowHarnessTest extends HarnessTestBase{ minMaxAggregationStateType, minMaxCRowType, 3, + 0, new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( processFunction, - new TupleRowKeySelector[String](3), + new TupleRowKeySelector[String](1), BasicTypeInfo.STRING_TYPE_INFO) testHarness.open() testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) + CRow(Row.of(toTS(801), "aaa", 1L: JLong), true))) testHarness.processWatermark(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501)) + CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) testHarness.processWatermark(19000) @@ -614,10 +561,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is removed after max retention time testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 3000 + CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 4500 + CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500 testHarness.processWatermark(20010) // compute output assert(testHarness.numKeyedStateEntries() > 0) // check that we have state @@ -628,7 +575,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is only removed if all data was processed testHarness.processElement(new StreamRecord( - CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 20011)) // clean-up 6500 + CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500 assert(testHarness.numKeyedStateEntries() > 0) // check that we have state testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500 @@ -648,59 +595,42 @@ class OverWindowHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801)) + CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501)) + CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true), 20011)) + CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -715,12 +645,13 @@ class OverWindowHarnessTest extends HarnessTestBase{ genMinMaxAggFunction, minMaxAggregationStateType, minMaxCRowType, + 0, new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( processFunction, - new TupleRowKeySelector[String](3), + new TupleRowKeySelector[String](1), BasicTypeInfo.STRING_TYPE_INFO) testHarness.open() @@ -728,47 +659,47 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1000) testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) + CRow(Row.of(toTS(801), "aaa", 1L: JLong), true))) testHarness.processWatermark(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501)) + CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) testHarness.processWatermark(19000) @@ -781,10 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(20000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000 + CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000 + CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000 assert(testHarness.numKeyedStateEntries() > 0) testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000 @@ -802,56 +733,40 @@ class OverWindowHarnessTest extends HarnessTestBase{ // all elements at the same row-time have the same value per key expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801)) + CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501)) + CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -863,12 +778,13 @@ class OverWindowHarnessTest extends HarnessTestBase{ genMinMaxAggFunction, minMaxAggregationStateType, minMaxCRowType, + 0, new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)))) val testHarness = createHarnessTester( processFunction, - new TupleRowKeySelector[String](3), + new TupleRowKeySelector[String](1), BasicTypeInfo.STRING_TYPE_INFO) testHarness.open() @@ -876,47 +792,47 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1000) testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) + CRow(Row.of(toTS(801), "aaa", 1L: JLong), true))) testHarness.processWatermark(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501)) + CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) testHarness.processWatermark(19000) @@ -929,10 +845,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(20000) testHarness.processElement(new StreamRecord( - CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000 + CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000 + CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000 assert(testHarness.numKeyedStateEntries() > 0) testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000 @@ -949,56 +865,40 @@ class OverWindowHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801)) + CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501)) + CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001)) + CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801)) + CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501)) + CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001)) + CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001)) + CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001)) + CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true), 20001)) + CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) expectedOutput.add(new StreamRecord( - CRow( - Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true), 20002)) + CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } }
http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala index 0451534..18ba6bb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.harness import java.lang.{Integer => JInt, Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS} import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} @@ -35,6 +36,7 @@ import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, import org.apache.flink.table.runtime.aggregate.{CollectionRowComparator, ProcTimeSortProcessFunction, RowTimeSortProcessFunction} import org.apache.flink.table.runtime.harness.SortProcessFunctionHarnessTest.TupleRowSelector import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.types.Row import org.junit.Test @@ -75,7 +77,7 @@ class SortProcessFunctionHarnessTest { inputCRowType, collectionRowComparator)) - val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,CRow,CRow]( + val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, CRow]( processFunction, new TupleRowSelector(0), BasicTypeInfo.INT_TYPE_INFO) @@ -86,77 +88,77 @@ class SortProcessFunctionHarnessTest { // timestamp is ignored in processing time testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002)) + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2003)) + Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2004)) + Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006)) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true))) //move the timestamp to ensure the execution testHarness.setProcessingTime(1005) - + testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007)) + Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007)) + Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007)) - + Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true))) + testHarness.setProcessingTime(1008) - + val result = testHarness.getOutput - + val expectedOutput = new ConcurrentLinkedQueue[Object]() - + // all elements at the same proc timestamp have the same value // elements should be sorted ascending on field 1 and descending on field 2 // (10,0) (11,1) (12,2) (12,1) (12,0) // (1,0) (2,0) - + expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4)) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true), 4)) + Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4)) + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4)) - + Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true))) + expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006)) + Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006)) + Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006)) + Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true))) TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result) - + testHarness.close() } - + @Test def testSortRowTimeHarnessPartitioned(): Unit = { - + val rT = new RowTypeInfo(Array[TypeInformation[_]]( INT_TYPE_INFO, LONG_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO, - LONG_TYPE_INFO), + TimeIndicatorTypeInfo.ROWTIME_INDICATOR), Array("a", "b", "c", "d", "e")) val indexes = Array(1, 2) - + val fieldComps = Array[TypeComparator[AnyRef]]( LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]], INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] ) - val booleanOrders = Array(true, false) + val booleanOrders = Array(true, false) val rowComp = new RowComparator( rT.getTotalFields, @@ -164,21 +166,22 @@ class SortProcessFunctionHarnessTest { fieldComps, new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons booleanOrders) - + val collectionRowComparator = new CollectionRowComparator(rowComp) - + val inputCRowType = CRowTypeInfo(rT) - + val processFunction = new KeyedProcessOperator[Integer,CRow,CRow]( new RowTimeSortProcessFunction( inputCRowType, + 4, Some(collectionRowComparator))) - + val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, CRow]( - processFunction, - new TupleRowSelector(0), + processFunction, + new TupleRowSelector(0), BasicTypeInfo.INT_TYPE_INFO) - + testHarness.open() testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime) @@ -186,71 +189,71 @@ class SortProcessFunctionHarnessTest { // timestamp is ignored in processing time testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002)) + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2002)) + Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2002)) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002)) + Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2004)) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006)) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true))) // move watermark forward testHarness.processWatermark(2007) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008)) + Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002)) // too late + Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) // too late testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2019)) // too early + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2019)), true))) // too early testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008)) + Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010)) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008)) + Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true))) // move watermark forward testHarness.processWatermark(2012) val result = testHarness.getOutput - + val expectedOutput = new ConcurrentLinkedQueue[Object]() - + // all elements at the same proc timestamp have the same value // elements should be sorted ascending on field 1 and descending on field 2 // (10,0) (11,1) (12,2) (12,1) (12,0) expectedOutput.add(new Watermark(3)) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 1001)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2002)) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 2002)) + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong),true), 2002)) + Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2002)) + Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2004)) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2006)) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true))) expectedOutput.add(new Watermark(2007)) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008)) + Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008)) + Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008)) + Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010)) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true))) expectedOutput.add(new Watermark(2012)) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 4121754..82ed81c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.stream.table import java.io.File import java.lang.{Boolean => JBool} +import java.sql.Timestamp import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation @@ -28,19 +29,22 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.TableEnvironment -import org.apache.flink.table.runtime.utils.StreamTestData +import org.apache.flink.table.api.{TableEnvironment, TableException, Types} +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} import org.apache.flink.table.sinks._ import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.junit.Assert._ import org.junit.Test import scala.collection.mutable +import scala.collection.JavaConverters._ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @@ -199,8 +203,6 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { } - - @Test def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -349,6 +351,136 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected, retracted) } + @Test + def testToAppendStreamRowtime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + val r = t + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('num, 'w) + .select('num, 'w.rowtime, 'w.rowtime.cast(Types.LONG)) + + r.toAppendStream[Row] + .process(new ProcessFunction[Row, Row] { + override def processElement( + row: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + val rowTS: Long = row.getField(2).asInstanceOf[Long] + if (ctx.timestamp() == rowTS) { + out.collect(row) + } + } + }).addSink(new StreamITCase.StringSink[Row]) + + env.execute() + + val expected = List( + "1,1970-01-01 00:00:00.004,4", + "2,1970-01-01 00:00:00.004,4", + "3,1970-01-01 00:00:00.004,4", + "3,1970-01-01 00:00:00.009,9", + "4,1970-01-01 00:00:00.009,9", + "4,1970-01-01 00:00:00.014,14", + "5,1970-01-01 00:00:00.014,14", + "5,1970-01-01 00:00:00.019,19", + "6,1970-01-01 00:00:00.019,19", + "6,1970-01-01 00:00:00.024,24") + + assertEquals(expected, StreamITCase.testResults.sorted) + } + + @Test + def testToRetractStreamRowtime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + val r = t + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('num, 'w) + .select('num, 'w.rowtime, 'w.rowtime.cast(Types.LONG)) + + r.toRetractStream[Row] + .process(new ProcessFunction[(Boolean, Row), Row] { + override def processElement( + row: (Boolean, Row), + ctx: ProcessFunction[(Boolean, Row), Row]#Context, + out: Collector[Row]): Unit = { + + val rowTS: Long = row._2.getField(2).asInstanceOf[Long] + if (ctx.timestamp() == rowTS) { + out.collect(row._2) + } + } + }).addSink(new StreamITCase.StringSink[Row]) + + env.execute() + + val expected = List( + "1,1970-01-01 00:00:00.004,4", + "2,1970-01-01 00:00:00.004,4", + "3,1970-01-01 00:00:00.004,4", + "3,1970-01-01 00:00:00.009,9", + "4,1970-01-01 00:00:00.009,9", + "4,1970-01-01 00:00:00.014,14", + "5,1970-01-01 00:00:00.014,14", + "5,1970-01-01 00:00:00.019,19", + "6,1970-01-01 00:00:00.019,19", + "6,1970-01-01 00:00:00.024,24") + + assertEquals(expected, StreamITCase.testResults.sorted) + } + + @Test(expected = classOf[TableException]) + def testToAppendStreamMultiRowtime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + val r = t + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('num, 'w) + .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2) + + r.toAppendStream[Row] + } + + @Test(expected = classOf[TableException]) + def testToRetractStreamMultiRowtime(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + val r = t + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('num, 'w) + .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2) + + r.toRetractStream[Row] + } + /** Converts a list of retraction messages into a list of final results. */ private def restractResults(results: List[JTuple2[JBool, Row]]): List[String] = {