This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9b1d78198cd25275b17e1d43ebd75a8a2f6cff7f Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Jul 24 11:21:00 2019 +0200 [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client This closes #9229. --- .../client/gateway/local/ExecutionContext.java | 28 ++--- .../table/client/gateway/local/LocalExecutor.java | 7 +- .../client/gateway/local/LocalExecutorITCase.java | 23 +++- .../client/gateway/utils/SimpleCatalogFactory.java | 118 +++++++++++++++++++++ .../org.apache.flink.table.factories.TableFactory | 1 + .../test/resources/test-sql-client-defaults.yaml | 3 + .../flink/table/api/EnvironmentSettings.java | 7 +- 7 files changed, 168 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 0df7fba..ae7fb2c 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -302,20 +302,6 @@ public class ExecutionContext<T> { // register catalogs catalogs.forEach(tableEnv::registerCatalog); - // set current catalog - if (sessionContext.getCurrentCatalog().isPresent()) { - tableEnv.useCatalog(sessionContext.getCurrentCatalog().get()); - } else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) { - tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get()); - } - - // set current database - if (sessionContext.getCurrentDatabase().isPresent()) { - tableEnv.useDatabase(sessionContext.getCurrentDatabase().get()); - } else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) { - tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get()); - } - // create query config queryConfig = createQueryConfig(); @@ -340,6 +326,20 @@ public class ExecutionContext<T> { registerTemporalTable(temporalTableEntry); } }); + + // set current catalog + if (sessionContext.getCurrentCatalog().isPresent()) { + tableEnv.useCatalog(sessionContext.getCurrentCatalog().get()); + } else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) { + tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get()); + } + + // set current database + if (sessionContext.getCurrentDatabase().isPresent()) { + tableEnv.useDatabase(sessionContext.getCurrentDatabase().get()); + } else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) { + tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get()); + } } public QueryConfig getQueryConfig() { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 7e41f1f..101f72c 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.QueryConfig; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; @@ -473,7 +474,11 @@ public class LocalExecutor implements Executor { // writing to a sink requires an optimization step that might reference UDFs during code compilation context.wrapClassLoader(() -> { envInst.getTableEnvironment().registerTableSink(jobName, result.getTableSink()); - table.insertInto(jobName, envInst.getQueryConfig()); + table.insertInto( + envInst.getQueryConfig(), + EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, + EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, + jobName); return null; }); jobGraph = envInst.createJobGraph(jobName); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index ac1a7ae..5dbec41 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -42,6 +42,7 @@ import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; +import org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; @@ -63,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; @@ -160,7 +162,8 @@ public class LocalExecutorITCase extends TestLogger { final List<String> expectedCatalogs = Arrays.asList( "default_catalog", - "catalog1"); + "catalog1", + "simple-catalog"); assertEquals(expectedCatalogs, actualCatalogs); } @@ -402,7 +405,7 @@ public class LocalExecutorITCase extends TestLogger { final SessionContext session = new SessionContext("test-session", new Environment()); try { - // start job + // Case 1: Registered sink final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate( session, "INSERT INTO TableSourceSink SELECT IntegerField1 = 42, StringField1 FROM TableNumber1"); @@ -424,6 +427,22 @@ public class LocalExecutorITCase extends TestLogger { fail("Unexpected job status."); } } + + // Case 2: Temporary sink + session.setCurrentCatalog("simple-catalog"); + session.setCurrentDatabase("default_database"); + // all queries are pipelined to an in-memory sink, check it is properly registered + final ResultDescriptor otherCatalogDesc = executor.executeQuery(session, "SELECT * FROM `test-table`"); + + final List<String> otherCatalogResults = retrieveTableResult( + executor, + session, + otherCatalogDesc.getResultId()); + + TestBaseUtils.compareResultCollections( + SimpleCatalogFactory.TABLE_CONTENTS.stream().map(Row::toString).collect(Collectors.toList()), + otherCatalogResults, + Comparator.naturalOrder()); } finally { executor.stop(session); } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java new file mode 100644 index 0000000..5f533fb --- /dev/null +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ConnectorCatalogTable; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.descriptors.CatalogDescriptorValidator; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.util.WrappingRuntimeException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Catalog factory for an in-memory catalog that contains a single non-empty table. + * The contents of the table are equal to {@link SimpleCatalogFactory#TABLE_CONTENTS}. + */ +public class SimpleCatalogFactory implements CatalogFactory { + + public static final String CATALOG_TYPE_VALUE = "simple-catalog"; + + public static final String TEST_TABLE_NAME = "test-table"; + + public static final List<Row> TABLE_CONTENTS = Arrays.asList( + Row.of(1, "Hello"), + Row.of(2, "Hello world"), + Row.of(3, "Hello world! Hello!") + ); + + @Override + public Catalog createCatalog(String name, Map<String, String> properties) { + String database = properties.getOrDefault( + CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, + "default_database"); + GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database); + + String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME); + StreamTableSource<Row> tableSource = new StreamTableSource<Row>() { + @Override + public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { + return execEnv.fromCollection(TABLE_CONTENTS) + .returns(new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.STRING()}, + new String[]{"id", "string"})); + } + + @Override + public TableSchema getTableSchema() { + return TableSchema.builder() + .field("id", DataTypes.INT()) + .field("string", DataTypes.STRING()) + .build(); + } + + @Override + public DataType getProducedDataType() { + return DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("string", DataTypes.STRING()) + ); + } + }; + + try { + genericInMemoryCatalog.createTable( + new ObjectPath(database, tableName), + ConnectorCatalogTable.source(tableSource, false), + false + ); + } catch (Exception e) { + throw new WrappingRuntimeException(e); + } + + return genericInMemoryCatalog; + } + + @Override + public Map<String, String> requiredContext() { + Map<String, String> context = new HashMap<>(); + context.put(CatalogDescriptorValidator.CATALOG_TYPE, CATALOG_TYPE_VALUE); + return context; + } + + @Override + public List<String> supportedProperties() { + return Arrays.asList(CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, TEST_TABLE_NAME); + } +} diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index b4e3095..5ba6b0b 100644 --- a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -15,5 +15,6 @@ org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory +org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index 9e0582b..9844d54 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -137,3 +137,6 @@ deployment: catalogs: - name: catalog1 type: DependencyTest + - name: simple-catalog + type: simple-catalog + test-table: test-table diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index bfd9203..7eec4a3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -46,8 +46,11 @@ import java.util.Map; */ @PublicEvolving public class EnvironmentSettings { + public static final String STREAMING_MODE = "streaming-mode"; public static final String CLASS_NAME = "class-name"; + public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog"; + public static final String DEFAULT_BUILTIN_DATABASE = "default_database"; /** * Canonical name of the {@link Planner} class to use. @@ -158,8 +161,8 @@ public class EnvironmentSettings { private String plannerClass = OLD_PLANNER_FACTORY; private String executorClass = OLD_EXECUTOR_FACTORY; - private String builtInCatalogName = "default_catalog"; - private String builtInDatabaseName = "default_database"; + private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG; + private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE; private boolean isStreamingMode = true; /**