Repository: flink Updated Branches: refs/heads/master 660a45ca1 -> 57333c622
[FLINK-7763] [table] Fix testing RowSink for enabled object reuse. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57333c62 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57333c62 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57333c62 Branch: refs/heads/master Commit: 57333c62271253248bf3699be31ae7224e97de75 Parents: 660a45c Author: Fabian Hueske <[email protected]> Authored: Fri Oct 13 11:30:39 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Fri Oct 13 11:30:39 2017 +0200 ---------------------------------------------------------------------- .../runtime/stream/table/TableSinkITCase.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/57333c62/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index c5b82fe..07934b8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -51,6 +51,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testInsertIntoRegisteredTableSink(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -86,6 +87,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { val path = tmpFile.toURI.toString val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(4) @@ -109,6 +111,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testAppendSinkOnAppendTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -137,6 +140,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testRetractSinkOnUpdatingTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -168,6 +172,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testRetractSinkOnAppendTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -202,6 +207,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -236,6 +242,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -273,6 +280,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -310,6 +318,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -347,6 +356,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -384,6 +394,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test def testToAppendStreamRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear @@ -478,6 +489,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test(expected = classOf[TableException]) def testToAppendStreamMultiRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -496,6 +508,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { @Test(expected = classOf[TableException]) def testToRetractStreamMultiRowtime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -658,8 +671,11 @@ object RowCollector { new mutable.ArrayBuffer[JTuple2[JBool, Row]]() def addValue(value: JTuple2[JBool, Row]): Unit = { + + // make a copy + val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1)) sink.synchronized { - sink += value + sink += copy } }
