This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a194b37 [FLINK-13688][hive] Limit the parallelism/memory of HiveCatalogUseBlinkITCase a194b37 is described below commit a194b37d9b99a47174de9108a937f821816d61f5 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Mon Aug 12 10:24:39 2019 +0200 [FLINK-13688][hive] Limit the parallelism/memory of HiveCatalogUseBlinkITCase This closes #9417 --- flink-connectors/flink-connector-hive/pom.xml | 9 ++++++++- .../table/catalog/hive/HiveCatalogUseBlinkITCase.java | 6 +++++- .../table/planner/runtime/utils/BatchTestBase.scala | 16 ++++++++++------ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 017abe5..d8d00a3 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -616,7 +616,14 @@ under the License. <scope>test</scope> </dependency> - </dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> <build> <plugins> diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java index 8308efb..d610125 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java @@ -34,6 +34,8 @@ import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF; import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF; import org.apache.flink.table.functions.hive.util.TestHiveUDTF; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.FileUtils; import com.klarna.hiverunner.HiveShell; @@ -65,7 +67,7 @@ import static java.lang.String.format; * TODO: move to flink-connector-hive-test end-to-end test module once it's setup */ @RunWith(FlinkStandaloneHiveRunner.class) -public class HiveCatalogUseBlinkITCase { +public class HiveCatalogUseBlinkITCase extends AbstractTestBase { @HiveSQL(files = {}) private static HiveShell hiveShell; @@ -97,6 +99,8 @@ public class HiveCatalogUseBlinkITCase { TableEnvironment tEnv = TableEnvironment.create( EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); + BatchTestBase.configForMiniCluster(tEnv.getConfig()); + tEnv.registerCatalog("myhive", hiveCatalog); tEnv.useCatalog("myhive"); diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala index 76c2171..c75aff8 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala @@ -69,12 +69,7 @@ class BatchTestBase extends BatchAbstractTestBase { @Before def before(): Unit = { - conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM) - conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY, "2mb") - conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY, "2mb") - conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_SORT_MEMORY, "1mb") - conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY, "1mb") - conf.getConfiguration.setString(TABLE_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString) + BatchTestBase.configForMiniCluster(conf) } /** @@ -477,4 +472,13 @@ object BatchTestBase { assertEquals(msg, e, r) } } + + def configForMiniCluster(conf: TableConfig): Unit = { + conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM) + conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY, "2mb") + conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY, "2mb") + conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_SORT_MEMORY, "1mb") + conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY, "1mb") + conf.getConfiguration.setString(TABLE_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString) + } }