This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5b01e71 [FLINK-13447][table] Change default planner to legacy planner instead of any one 5b01e71 is described below commit 5b01e7139c1d0307e93f0e448e75ee6b5e6541ca Author: Jark Wu <imj...@gmail.com> AuthorDate: Sat Jul 27 23:08:18 2019 +0800 [FLINK-13447][table] Change default planner to legacy planner instead of any one This closes #9249 --- .../flink/table/api/EnvironmentSettings.java | 26 +++++++++------- .../flink/table/api/TableEnvironmentTest.scala | 2 +- .../planner/match/PatternTranslatorTestBase.scala | 3 +- .../validation/SetOperatorsValidationTest.scala | 11 ++++--- .../table/validation/TableSinkValidationTest.scala | 9 +++--- .../validation/UnsupportedOpsValidationTest.scala | 20 ++++++------- .../planner/plan/utils/FlinkRelOptUtilTest.scala | 3 +- .../runtime/stream/sql/MatchRecognizeITCase.scala | 35 +++++++++++----------- .../runtime/stream/sql/TemporalJoinITCase.scala | 6 ++-- .../runtime/stream/table/TableSinkITCase.scala | 29 +++++++++--------- .../runtime/utils/StreamingWithStateTestBase.scala | 4 +-- .../flink/table/planner/utils/TableTestBase.scala | 5 ++++ 12 files changed, 79 insertions(+), 74 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index 29c9227..8289bbc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -149,29 +149,35 @@ public class EnvironmentSettings { * A builder for {@link EnvironmentSettings}. */ public static class Builder { - private String plannerClass = null; - private String executorClass = null; + private static final String OLD_PLANNER_FACTORY = "org.apache.flink.table.planner.StreamPlannerFactory"; + private static final String OLD_EXECUTOR_FACTORY = "org.apache.flink.table.executor.StreamExecutorFactory"; + private static final String BLINK_PLANNER_FACTORY = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory"; + private static final String BLINK_EXECUTOR_FACTORY = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory"; + + private String plannerClass = OLD_PLANNER_FACTORY; + private String executorClass = OLD_EXECUTOR_FACTORY; private String builtInCatalogName = "default_catalog"; private String builtInDatabaseName = "default_database"; private boolean isStreamingMode = true; /** - * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is - * enabled. + * Sets the old Flink planner as the required module. + * + * <p>This is the default behavior. */ public Builder useOldPlanner() { - this.plannerClass = "org.apache.flink.table.planner.StreamPlannerFactory"; - this.executorClass = "org.apache.flink.table.executor.StreamExecutorFactory"; + this.plannerClass = OLD_PLANNER_FACTORY; + this.executorClass = OLD_EXECUTOR_FACTORY; return this; } /** - * Sets the Blink planner as the required module. By default, {@link #useAnyPlanner()} is + * Sets the Blink planner as the required module. By default, {@link #useOldPlanner()} is * enabled. */ public Builder useBlinkPlanner() { - this.plannerClass = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory"; - this.executorClass = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory"; + this.plannerClass = BLINK_PLANNER_FACTORY; + this.executorClass = BLINK_EXECUTOR_FACTORY; return this; } @@ -180,7 +186,7 @@ public class EnvironmentSettings { * * <p>A planner will be discovered automatically, if there is only one planner available. * - * <p>This is the default behavior. + * <p>By default, {@link #useOldPlanner()} is enabled. */ public Builder useAnyPlanner() { this.plannerClass = null; diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 733246d..1a0996b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -39,7 +39,7 @@ class TableEnvironmentTest { def thrown: ExpectedException = expectedException val env = new StreamExecutionEnvironment(new LocalStreamEnvironment()) - val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) + val tableEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) @Test def testScanNonExistTable(): Unit = { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala index 9301470..254e60f 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala @@ -35,7 +35,6 @@ import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.table.types.logical.{IntType, RowType} import org.apache.flink.types.Row import org.apache.flink.util.TestLogger - import org.apache.calcite.rel.RelNode import org.apache.calcite.tools.RelBuilder import org.junit.Assert._ @@ -67,7 +66,7 @@ abstract class PatternTranslatorTestBase extends TestLogger { when(jDataStreamMock.getId).thenReturn(0) val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) TableTestUtil.registerDataStream( tEnv, tableName, dataStreamMock.javaStream, Some(Array[Expression]('f0, 'proctime.proctime))) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala index da04e34..cb16186 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/SetOperatorsValidationTest.scala @@ -23,9 +23,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.runtime.utils.{TestData, TestingAppendSink} -import org.apache.flink.table.planner.utils.TableTestBase +import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil} import org.apache.flink.types.Row - import org.junit.Assert.assertEquals import org.junit.Test @@ -34,7 +33,7 @@ class SetOperatorsValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testUnionFieldsNameNotOverlap1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c) val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) @@ -51,7 +50,7 @@ class SetOperatorsValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testUnionFieldsNameNotOverlap2(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c) val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) @@ -68,8 +67,8 @@ class SetOperatorsValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testUnionTablesFromDifferentEnv(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv1 = StreamTableEnvironment.create(env) - val tEnv2 = StreamTableEnvironment.create(env) + val tEnv1 = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) + val tEnv2 = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val ds1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv1, 'a, 'b, 'c) val ds2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv2, 'a, 'b, 'c) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala index 418c034..16d7ff1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TableSinkValidationTest.scala @@ -24,9 +24,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.runtime.utils.{TestData, TestingAppendSink, TestingUpsertTableSink} -import org.apache.flink.table.planner.utils.TableTestBase +import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil} import org.apache.flink.types.Row - import org.junit.Test class TableSinkValidationTest extends TableTestBase { @@ -34,7 +33,7 @@ class TableSinkValidationTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testAppendSinkOnUpdatingTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(TestData.smallTupleData3).toTable(tEnv, 'a, 'b, 'c) @@ -50,7 +49,7 @@ class TableSinkValidationTest extends TableTestBase { def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(TestData.tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -72,7 +71,7 @@ class TableSinkValidationTest extends TableTestBase { @Test(expected = classOf[TableException]) def testAppendSinkOnLeftJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val ds1 = env.fromCollection(TestData.tupleData3).toTable(tEnv, 'a, 'b, 'c) val ds2 = env.fromCollection(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala index 38eabf8..91a7eb6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/UnsupportedOpsValidationTest.scala @@ -23,8 +23,8 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.runtime.utils.TestData +import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.test.util.AbstractTestBase - import org.junit.Test class UnsupportedOpsValidationTest extends AbstractTestBase { @@ -32,14 +32,14 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testSort(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.fromCollection(TestData.smallTupleData3).toTable(tEnv).orderBy('_1.desc) } @Test(expected = classOf[ValidationException]) def testJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.join(t2) @@ -48,7 +48,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testUnion(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.union(t2) @@ -57,7 +57,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testIntersect(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.intersect(t2) @@ -66,7 +66,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testIntersectAll(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.intersectAll(t2) @@ -75,7 +75,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testMinus(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.minus(t2) @@ -84,7 +84,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testMinusAll(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) val t2 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.minusAll(t2) @@ -93,7 +93,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testOffset(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.offset(5) } @@ -101,7 +101,7 @@ class UnsupportedOpsValidationTest extends AbstractTestBase { @Test(expected = classOf[ValidationException]) def testFetch(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t1 = env.fromCollection(TestData.smallTupleData3).toTable(tEnv) t1.fetch(5) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala index 1726aa0..3b885ec 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtilTest.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.TableConfig import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchMode} import org.apache.flink.table.planner.utils.TableTestUtil @@ -33,7 +32,7 @@ class FlinkRelOptUtilTest { @Test def testToString(): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironment() - val tableEnv = StreamTableEnvironment.create(env, new TableConfig()) + val tableEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val table = env.fromElements[(Int, Long, String)]().toTable(tableEnv, 'a, 'b, 'c) tableEnv.registerTable("MyTable", table) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala index 68f0897..228b644 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala @@ -24,14 +24,14 @@ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableConfig, Types} +import org.apache.flink.table.api.Types import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction} import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, UserDefinedFunctionTestUtils} +import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.types.Row - import org.junit.Assert.assertEquals import org.junit.Test import org.junit.runner.RunWith @@ -48,7 +48,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testSimplePattern(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(Int, String)] data.+=((1, "a")) @@ -94,7 +94,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testSimplePatternWithNulls(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(Int, String, String)] data.+=((1, "a", null)) @@ -141,11 +141,10 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testCodeSplitsAreProperlyGenerated(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tableConfig = new TableConfig + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) // TODO: this code is ported from flink-planner, - // However code split is not supported in blink-planner. - tableConfig.setMaxGeneratedCodeLength(1) - val tEnv = StreamTableEnvironment.create(env, tableConfig) + // However code split is not supported in blink-planner yet. + tEnv.getConfig.setMaxGeneratedCodeLength(1) val data = new mutable.MutableList[(Int, String, String, String)] data.+=((1, "a", "key1", "second_key3")) @@ -198,7 +197,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState def testEventsAreProperlyOrdered(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = Seq( Left(2L, (12, 1, "a", 1)), @@ -256,7 +255,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState def testMatchRecognizeAppliedToWindowedGrouping(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] //first window @@ -317,7 +316,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState def testWindowedGroupingAppliedToMatchRecognize(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] //first window @@ -371,7 +370,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testLogicalOffsets(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] data.+=(("ACME", 1L, 19, 1)) @@ -420,7 +419,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testLogicalOffsetsWithStarVariable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(Int, String, Long, Int)] data.+=((1, "ACME", 1L, 20)) @@ -480,7 +479,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testLogicalOffsetOutsideOfRangeInMeasures(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(String, Long, Int, Int)] data.+=(("ACME", 1L, 19, 1)) @@ -531,7 +530,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testAggregates(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) tEnv.getConfig.setMaxGeneratedCodeLength(1) val data = new mutable.MutableList[(Int, String, Long, Double, Int)] @@ -592,7 +591,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testAggregatesWithNullInputs(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) tEnv.getConfig.setMaxGeneratedCodeLength(1) val data = new mutable.MutableList[Row] @@ -647,7 +646,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testAccessingCurrentTime(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val data = new mutable.MutableList[(Int, String)] data.+=((1, "a")) @@ -686,7 +685,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState @Test def testUserDefinedFunctions(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) tEnv.getConfig.setMaxGeneratedCodeLength(1) val data = new mutable.MutableList[(Int, String, Long)] diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala index 7edd4f8..3e7fcec 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala @@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink} +import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.types.Row - import org.junit.Assert.assertEquals import org.junit._ import org.junit.runner.RunWith @@ -49,7 +49,7 @@ class TemporalJoinITCase(state: StateBackendMode) @Test def testProcessTimeInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) @@ -98,7 +98,7 @@ class TemporalJoinITCase(state: StateBackendMode) @Test def testEventTimeInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index aa6b17d..0ab3480 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -26,11 +26,10 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableException, Tumble, Types} import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5} import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink} -import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil +import org.apache.flink.table.planner.utils.{MemoryTableSourceSinkUtil, TableTestUtil} import org.apache.flink.table.sinks._ import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils} import org.apache.flink.types.Row - import org.junit.Assert._ import org.junit.Test @@ -47,7 +46,7 @@ class TableSinkITCase extends AbstractTestBase { env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) MemoryTableSourceSinkUtil.clear() val input = env.fromCollection(tupleData3) @@ -83,7 +82,7 @@ class TableSinkITCase extends AbstractTestBase { env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(4) tEnv.registerTableSink( @@ -121,7 +120,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -157,7 +156,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c) val ds2 = env.fromCollection(tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -185,7 +184,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -223,7 +222,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -264,7 +263,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -306,7 +305,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -352,7 +351,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -399,7 +398,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -444,7 +443,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -489,7 +488,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) @@ -508,7 +507,7 @@ class TableSinkITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.enableObjectReuse() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = StreamTableEnvironment.create(env) + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) val t = env.fromCollection(tupleData3) .assignAscendingTimestamps(_._1.toLong) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala index 4bcd58b..7f7b65d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala @@ -30,10 +30,10 @@ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter, BinaryString} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} +import org.apache.flink.table.planner.utils.TableTestUtil import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo import org.apache.flink.table.types.logical.RowType - import org.junit.runners.Parameterized import org.junit.{After, Assert, Before} @@ -72,7 +72,7 @@ class StreamingWithStateTestBase(state: StateBackendMode) extends StreamingTestB env.setStateBackend(new RocksDBStateBackend( "file://" + baseCheckpointPath).configure(conf, classLoader)) } - this.tEnv = StreamTableEnvironment.create(env) + this.tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) FailingCollectionSource.failedBefore = true } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 672a71e..7fb59e1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1005,6 +1005,11 @@ object TestingTableEnvironment { object TableTestUtil { + val STREAM_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance() + .useBlinkPlanner().inStreamingMode().build() + val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance() + .useBlinkPlanner().inBatchMode().build() + /** * Converts operation tree in the given table to a RelNode tree. */