Repository: flink Updated Branches: refs/heads/release-1.3 664a04c89 -> 80c23de70
[FLINK-7763] [table] Fix testing RowSink for enabled object reuse. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80c23de7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80c23de7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80c23de7 Branch: refs/heads/release-1.3 Commit: 80c23de709a56f9ea3b5ecd04876d95a1df7db96 Parents: 664a04c Author: Fabian Hueske <[email protected]> Authored: Fri Oct 13 11:30:39 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Mon Oct 16 09:59:25 2017 +0200 ---------------------------------------------------------------------- .../flink/table/sinks/StreamTableSinksITCase.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/80c23de7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala index 47c55f1..723b603 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala @@ -44,6 +44,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test(expected = classOf[TableException]) def testAppendSinkOnUpdatingTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() val tEnv = TableEnvironment.getTableEnvironment(env) val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text) @@ -59,6 +60,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testAppendSinkOnAppendTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -87,6 +89,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testRetractSinkOnUpdatingTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -118,6 +121,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testRetractSinkOnAppendTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -152,6 +156,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -205,6 +210,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -242,6 +248,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -279,6 +286,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -316,6 +324,7 @@ class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { @Test def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -497,8 +506,11 @@ object RowCollector { new mutable.ArrayBuffer[JTuple2[JBool, Row]]() def addValue(value: JTuple2[JBool, Row]): Unit = { + + // make a copy + val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1)) sink.synchronized { - sink += value + sink += copy } }
