This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 88458eb [FLINK-13600][table] Rework TableEnvironment.connect() class hierarchy 88458eb is described below commit 88458ebe41d170a43e5621f4af007dc8e25b7115 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Aug 6 15:04:51 2019 +0200 [FLINK-13600][table] Rework TableEnvironment.connect() class hierarchy This closes #9382. --- .../table/catalog/hive/HiveCatalogITCase.java | 4 +- .../catalog/hive/HiveCatalogUseBlinkITCase.java | 4 +- .../apache/flink/table/api/TableEnvironment.java | 4 +- .../table/api/internal/TableEnvironmentImpl.java | 4 +- .../flink/table/catalog/CatalogTableBuilder.java | 92 +++----------- .../table/descriptors/BatchTableDescriptor.java | 16 ++- .../table/descriptors/ConnectTableDescriptor.java | 78 ++++-------- .../table/descriptors/StreamTableDescriptor.java | 85 ++----------- .../flink/table/api/TableEnvironmentTest.java | 74 +++++++++++ .../table/utils/ConnectorDescriptorMock.java} | 31 +++-- .../apache/flink/table/utils/ExecutorMock.java} | 26 ++-- .../flink/table/utils/FormatDescriptorMock.java} | 31 +++-- .../org/apache/flink/table/utils/PlannerMock.java} | 37 ++++-- .../flink/table/utils/TableEnvironmentMock.java | 99 +++++++++++++++ .../flink/table/utils/TableSourceFactoryMock.java | 75 +++++++++++ .../apache/flink/table/utils/TableSourceMock.java} | 34 +++-- .../org.apache.flink.table.factories.TableFactory | 1 + .../flink/table/catalog/ExternalCatalogTable.java | 4 +- .../table/descriptors/RegistrableDescriptor.java | 52 -------- .../table/descriptors/SchematicDescriptor.java | 9 +- .../table/descriptors/StreamableDescriptor.java | 73 ----------- .../flink/table/descriptors/TableDescriptor.java | 139 ++++++++++++++++++++- .../table/descriptors/ConnectTableDescriptor.java | 131 ------------------- .../table/descriptors/StreamTableDescriptor.java | 112 ----------------- .../catalog/ExternalCatalogTableBuilder.scala | 77 +----------- .../table/descriptors/TableDescriptorTest.scala | 2 +- .../table/descriptors/TestTableDescriptor.scala | 44 ++----- .../flink/table/utils/MockTableEnvironment.scala | 6 +- 28 files changed, 580 insertions(+), 764 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index df702ad..0898ac3 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -109,7 +109,7 @@ public class HiveCatalogITCase { schema) .withFormat(format) .inAppendMode() - .withComment(null) + .withComment("Comment.") .build(); Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv"); @@ -120,7 +120,7 @@ public class HiveCatalogITCase { schema) .withFormat(format) .inAppendMode() - .withComment(null) + .withComment("Comment.") .build(); hiveCatalog.createTable( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java index 6f669d0..8308efb 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java @@ -115,7 +115,7 @@ public class HiveCatalogUseBlinkITCase { schema) .withFormat(format) .inAppendMode() - .withComment(null) + .withComment("Comment.") .build(); Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv"); @@ -138,7 +138,7 @@ public class HiveCatalogUseBlinkITCase { sinkSchema) .withFormat(sinkFormat) .inAppendMode() - .withComment(null) + .withComment("Comment.") .build(); hiveCatalog.createTable( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index 8f8364f..eff0c2b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.descriptors.ConnectTableDescriptor; import org.apache.flink.table.descriptors.ConnectorDescriptor; -import org.apache.flink.table.descriptors.TableDescriptor; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; @@ -254,7 +254,7 @@ public interface TableEnvironment { * * @param connectorDescriptor connector descriptor describing the external system */ - TableDescriptor connect(ConnectorDescriptor connectorDescriptor); + ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor); /** * Gets the names of all catalogs registered in this environment. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 0b5d5fe..2979296 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -45,9 +45,9 @@ import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.descriptors.ConnectTableDescriptor; import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.descriptors.StreamTableDescriptor; -import org.apache.flink.table.descriptors.TableDescriptor; import org.apache.flink.table.expressions.TableReferenceExpression; import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.ScalarFunction; @@ -234,7 +234,7 @@ public class TableEnvironmentImpl implements TableEnvironment { } @Override - public TableDescriptor connect(ConnectorDescriptor connectorDescriptor) { + public ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor) { return new StreamTableDescriptor(this, connectorDescriptor); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java index 01f46bd..1760e19 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java @@ -22,24 +22,13 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.descriptors.ConnectorDescriptor; -import org.apache.flink.table.descriptors.ConnectorFormatDescriptor; import org.apache.flink.table.descriptors.Descriptor; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.FormatDescriptor; -import org.apache.flink.table.descriptors.Metadata; -import org.apache.flink.table.descriptors.Statistics; -import org.apache.flink.table.descriptors.StreamableDescriptor; import org.apache.flink.table.descriptors.TableDescriptor; +import org.apache.flink.util.Preconditions; import java.util.Collections; import java.util.Map; -import java.util.Optional; - -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * A builder for creating a {@link CatalogTable}. @@ -58,8 +47,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * new ExternalSystemXYZ() * .version("0.11"), * new TableSchema.Builder() - * .fields(names, dataTypes) - * .build()) + * .fields(names, dataTypes) + * .build()) * .withFormat( * new Json() * .jsonSchema("{...}") @@ -69,73 +58,36 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * </code> */ @PublicEvolving -public class CatalogTableBuilder - extends TableDescriptor - implements ConnectorFormatDescriptor<CatalogTableBuilder>, StreamableDescriptor<CatalogTableBuilder> { +public final class CatalogTableBuilder extends TableDescriptor<CatalogTableBuilder> { - private final ConnectorDescriptor connectorDescriptor; private final TableSchema tableSchema; - private final boolean isGeneric; private String comment; - private Optional<FormatDescriptor> formatDescriptor = Optional.empty(); - private Optional<Statistics> statisticsDescriptor = Optional.empty(); - private Optional<Metadata> metadataDescriptor = Optional.empty(); - private Optional<String> updateMode = Optional.empty(); + private final boolean isGeneric; + private Map<String, String> properties = Collections.emptyMap(); - /** - * - * @param connectorDescriptor descriptor of the connector - * @param tableSchema schema of the table - */ public CatalogTableBuilder(ConnectorDescriptor connectorDescriptor, TableSchema tableSchema) { - this.connectorDescriptor = checkNotNull(connectorDescriptor); - this.tableSchema = checkNotNull(tableSchema); + super(connectorDescriptor); + this.tableSchema = Preconditions.checkNotNull(tableSchema); - // We don't support non generic table currently + // We don't support non-generic table currently this.isGeneric = true; } - @Override - public CatalogTableBuilder withFormat(FormatDescriptor format) { - this.formatDescriptor = Optional.of(checkNotNull(format)); - return this; - } - - @Override - public CatalogTableBuilder inAppendMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND); - return this; - } - - @Override - public CatalogTableBuilder inRetractMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT); - return this; - } - - @Override - public CatalogTableBuilder inUpsertMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT); - return this; - } - public CatalogTableBuilder withComment(String comment) { - this.comment = comment; + this.comment = Preconditions.checkNotNull(comment, "Comment must not be null."); return this; } public CatalogTableBuilder withProperties(Map<String, String> properties) { - this.properties = checkNotNull(properties); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); return this; } /** - * Build a {@link CatalogTable}. - * - * @return catalog table + * Builds a {@link CatalogTable}. */ public CatalogTable build() { return new CatalogTableImpl( @@ -145,28 +97,12 @@ public class CatalogTableBuilder } @Override - public Map<String, String> toProperties() { + protected Map<String, String> additionalProperties() { DescriptorProperties descriptorProperties = new DescriptorProperties(); - descriptorProperties.putProperties(connectorDescriptor.toProperties()); - - if (formatDescriptor.isPresent()) { - descriptorProperties.putProperties(formatDescriptor.get().toProperties()); - } - - if (statisticsDescriptor.isPresent()) { - descriptorProperties.putProperties(statisticsDescriptor.get().toProperties()); - } - - if (metadataDescriptor.isPresent()) { - descriptorProperties.putProperties(metadataDescriptor.get().toProperties()); - } - if (updateMode.isPresent()) { - descriptorProperties.putString(UPDATE_MODE, updateMode.get()); - } + descriptorProperties.putBoolean(CatalogConfig.IS_GENERIC, isGeneric); descriptorProperties.putProperties(this.properties); - descriptorProperties.putString(CatalogConfig.IS_GENERIC, String.valueOf(isGeneric)); return descriptorProperties.asMap(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java index cd0b0e2..cd01801 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java @@ -22,14 +22,20 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.api.TableEnvironment; /** - * Descriptor for specifying a table source and/or sink in a batch environment. + * Describes a table connected from a batch environment. + * + * <p>This class just exists for backwards compatibility use {@link ConnectTableDescriptor} for + * declarations. */ @PublicEvolving -public class BatchTableDescriptor extends ConnectTableDescriptor<BatchTableDescriptor> { +public final class BatchTableDescriptor extends ConnectTableDescriptor { - public BatchTableDescriptor( - TableEnvironment tableEnv, - ConnectorDescriptor connectorDescriptor) { + public BatchTableDescriptor(TableEnvironment tableEnv, ConnectorDescriptor connectorDescriptor) { super(tableEnv, connectorDescriptor); } + + @Override + public BatchTableDescriptor withSchema(Schema schema) { + return (BatchTableDescriptor) super.withSchema(schema); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java index a046a06..ffc56ab 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java @@ -20,32 +20,42 @@ package org.apache.flink.table.descriptors; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + +import java.util.Collections; import java.util.Map; -import java.util.Optional; /** - * Common class for table's created with {@link TableEnvironment#connect(ConnectorDescriptor connectorDescriptor)}. + * Describes a table connected from {@link TableEnvironment#connect(ConnectorDescriptor)}. + * + * <p>It can access {@link TableEnvironment} for fluently registering the table. */ @PublicEvolving -public abstract class ConnectTableDescriptor<D extends ConnectTableDescriptor<D>> - extends TableDescriptor - implements SchematicDescriptor<D>, RegistrableDescriptor { - - private Optional<FormatDescriptor> formatDescriptor = Optional.empty(); - private Optional<Schema> schemaDescriptor = Optional.empty(); +public abstract class ConnectTableDescriptor + extends TableDescriptor<ConnectTableDescriptor> + implements SchematicDescriptor<ConnectTableDescriptor> { private final TableEnvironment tableEnv; - private final ConnectorDescriptor connectorDescriptor; + + private @Nullable Schema schemaDescriptor; public ConnectTableDescriptor(TableEnvironment tableEnv, ConnectorDescriptor connectorDescriptor) { + super(connectorDescriptor); this.tableEnv = tableEnv; - this.connectorDescriptor = connectorDescriptor; + } + + /** + * Specifies the resulting table schema. + */ + @Override + public ConnectTableDescriptor withSchema(Schema schema) { + schemaDescriptor = Preconditions.checkNotNull(schema, "Schema must not be null."); + return this; } /** @@ -54,7 +64,6 @@ public abstract class ConnectTableDescriptor<D extends ConnectTableDescriptor<D> * * @param name table name to be registered in the table environment */ - @Override public void registerTableSource(String name) { Preconditions.checkNotNull(name); TableSource<?> tableSource = TableFactoryUtil.findAndCreateTableSource(this); @@ -67,7 +76,6 @@ public abstract class ConnectTableDescriptor<D extends ConnectTableDescriptor<D> * * @param name table name to be registered in the table environment */ - @Override public void registerTableSink(String name) { Preconditions.checkNotNull(name); TableSink<?> tableSink = TableFactoryUtil.findAndCreateTableSink(this); @@ -80,52 +88,16 @@ public abstract class ConnectTableDescriptor<D extends ConnectTableDescriptor<D> * * @param name table name to be registered in the table environment */ - @Override public void registerTableSourceAndSink(String name) { registerTableSource(name); registerTableSink(name); } - /** - * Specifies the format that defines how to read data from a connector. - */ - @Override - public D withFormat(FormatDescriptor format) { - formatDescriptor = Optional.of(Preconditions.checkNotNull(format)); - return (D) this; - } - - /** - * Specifies the resulting table schema. - */ @Override - public D withSchema(Schema schema) { - schemaDescriptor = Optional.of(Preconditions.checkNotNull(schema)); - return (D) this; - } - - /** - * Converts this descriptor into a set of properties. - */ - @Override - public Map<String, String> toProperties() { - DescriptorProperties properties = new DescriptorProperties(); - - // this performs only basic validation - // more validation can only happen within a factory - if (connectorDescriptor.isFormatNeeded() && !formatDescriptor.isPresent()) { - throw new ValidationException(String.format("The connector %s requires a format description.", connectorDescriptor.toString())); - } else if (!connectorDescriptor.isFormatNeeded() && formatDescriptor.isPresent()) { - throw new ValidationException( - String.format("The connector %s does not require a format description " + - "but %s found.", connectorDescriptor.toString(), formatDescriptor.get().toString())); + protected Map<String, String> additionalProperties() { + if (schemaDescriptor != null) { + return schemaDescriptor.toProperties(); } - - properties.putProperties(connectorDescriptor.toProperties()); - - formatDescriptor.ifPresent(s -> properties.putProperties(s.toProperties())); - schemaDescriptor.ifPresent(s -> properties.putProperties(s.toProperties())); - - return properties.asMap(); + return Collections.emptyMap(); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java index 456bbff..8e67959 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java @@ -21,92 +21,21 @@ package org.apache.flink.table.descriptors; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.api.TableEnvironment; -import java.util.Map; -import java.util.Optional; - -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT; - /** - * Descriptor for specifying a table source and/or sink in a streaming environment. + * Describes a table connected from a streaming environment. + * + * <p>This class just exists for backwards compatibility use {@link ConnectTableDescriptor} for + * declarations. */ @PublicEvolving -public class StreamTableDescriptor - extends ConnectTableDescriptor<StreamTableDescriptor> - implements StreamableDescriptor<StreamTableDescriptor> { - - private Optional<String> updateMode = Optional.empty(); +public final class StreamTableDescriptor extends ConnectTableDescriptor { public StreamTableDescriptor(TableEnvironment tableEnv, ConnectorDescriptor connectorDescriptor) { super(tableEnv, connectorDescriptor); } - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In append mode, a dynamic table and an external connector only exchange INSERT messages. - * - * @see #inRetractMode() - * @see #inUpsertMode() - */ @Override - public StreamTableDescriptor inAppendMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND); - return this; - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. - * - * <p>An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an - * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for - * the updating (new) row. - * - * <p>In this mode, a key must not be defined as opposed to upsert mode. However, every update - * consists of two messages which is less efficient. - * - * @see #inAppendMode() - * @see #inUpsertMode() - */ - @Override - public StreamTableDescriptor inRetractMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT); - return this; - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. - * - * <p>This mode requires a (possibly composite) unique key by which updates can be propagated. The - * external connector needs to be aware of the unique key attribute in order to apply messages - * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as - * DELETE messages. - * - * <p>The main difference to a retract stream is that UPDATE changes are encoded with a single - * message and are therefore more efficient. - * - * @see #inAppendMode() - * @see #inRetractMode() - */ - @Override - public StreamTableDescriptor inUpsertMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT); - return this; - } - - @Override - public Map<String, String> toProperties() { - DescriptorProperties properties = new DescriptorProperties(); - - properties.putProperties(super.toProperties()); - updateMode.ifPresent(mode -> properties.putString(UPDATE_MODE, mode)); - - return properties.asMap(); + public StreamTableDescriptor withSchema(Schema schema) { + return (StreamTableDescriptor) super.withSchema(schema); } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java new file mode 100644 index 0000000..2c98c21 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ConnectorCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.utils.ConnectorDescriptorMock; +import org.apache.flink.table.utils.FormatDescriptorMock; +import org.apache.flink.table.utils.TableEnvironmentMock; +import org.apache.flink.table.utils.TableSourceFactoryMock; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link TableEnvironment}. + */ +public class TableEnvironmentTest { + + @Test + public void testConnect() throws Exception { + final TableEnvironmentMock tableEnv = TableEnvironmentMock.getStreamingInstance(); + + tableEnv + .connect(new ConnectorDescriptorMock(TableSourceFactoryMock.CONNECTOR_TYPE_VALUE, 1, true)) + .withFormat(new FormatDescriptorMock("my_format", 1)) + .withSchema(new Schema() + .field("my_field_0", "INT") + .field("my_field_1", "BOOLEAN")) + .inAppendMode() + .registerTableSource("my_table"); + + final Catalog catalog = tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG) + .orElseThrow(AssertionError::new); + + final CatalogBaseTable table = catalog + .getTable(new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "my_table")); + + assertThat( + table.getSchema(), + equalTo( + TableSchema.builder() + .field("my_field_0", DataTypes.INT()) + .field("my_field_1", DataTypes.BOOLEAN()) + .build())); + + final ConnectorCatalogTable<?, ?> connectorCatalogTable = (ConnectorCatalogTable<?, ?>) table; + + assertThat( + connectorCatalogTable.getTableSource().isPresent(), + equalTo(true)); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ConnectorDescriptorMock.java similarity index 54% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java copy to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ConnectorDescriptorMock.java index cd0b0e2..1eecccc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ConnectorDescriptorMock.java @@ -16,20 +16,31 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.utils; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.descriptors.ConnectorDescriptor; + +import java.util.HashMap; +import java.util.Map; /** - * Descriptor for specifying a table source and/or sink in a batch environment. + * Mocking {@link ConnectorDescriptor} for tests. */ -@PublicEvolving -public class BatchTableDescriptor extends ConnectTableDescriptor<BatchTableDescriptor> { +public class ConnectorDescriptorMock extends ConnectorDescriptor { + + private Map<String, String> connectorProperties = new HashMap<>(); + + public ConnectorDescriptorMock(String type, int version, boolean formatNeeded) { + super(type, version, formatNeeded); + } + + public ConnectorDescriptorMock property(String key, String value) { + connectorProperties.put(key, value); + return this; + } - public BatchTableDescriptor( - TableEnvironment tableEnv, - ConnectorDescriptor connectorDescriptor) { - super(tableEnv, connectorDescriptor); + @Override + protected Map<String, String> toConnectorProperties() { + return connectorProperties; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExecutorMock.java similarity index 62% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java rename to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExecutorMock.java index cd0b0e2..bcd7b7a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExecutorMock.java @@ -16,20 +16,26 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.utils; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.table.delegation.Executor; + +import java.util.List; /** - * Descriptor for specifying a table source and/or sink in a batch environment. + * Mocking {@link Executor} for tests. */ -@PublicEvolving -public class BatchTableDescriptor extends ConnectTableDescriptor<BatchTableDescriptor> { +public class ExecutorMock implements Executor { + + @Override + public void apply(List<Transformation<?>> transformations) { + // nothing to do + } - public BatchTableDescriptor( - TableEnvironment tableEnv, - ConnectorDescriptor connectorDescriptor) { - super(tableEnv, connectorDescriptor); + @Override + public JobExecutionResult execute(String jobName) throws Exception { + return null; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FormatDescriptorMock.java similarity index 56% rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java rename to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FormatDescriptorMock.java index 7397d39..44d455b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FormatDescriptorMock.java @@ -16,18 +16,31 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.utils; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.descriptors.FormatDescriptor; + +import java.util.HashMap; +import java.util.Map; /** - * An interface for descriptors that allow to define a format. + * Mocking {@link FormatDescriptor} for tests. */ -@PublicEvolving -public interface ConnectorFormatDescriptor<D extends ConnectorFormatDescriptor<D>> extends Descriptor { +public class FormatDescriptorMock extends FormatDescriptor { + + private Map<String, String> formatProperties = new HashMap<>(); + + public FormatDescriptorMock(String type, int version) { + super(type, version); + } + + public FormatDescriptorMock property(String key, String value) { + formatProperties.put(key, value); + return this; + } - /** - * Specifies the format that defines how to read data from a connector. - */ - D withFormat(FormatDescriptor format); + @Override + protected Map<String, String> toFormatProperties() { + return formatProperties; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java similarity index 52% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java copy to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java index cd0b0e2..8dee10c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java @@ -16,20 +16,37 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.utils; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; + +import java.util.List; /** - * Descriptor for specifying a table source and/or sink in a batch environment. + * Mocking {@link Planner} for tests. */ -@PublicEvolving -public class BatchTableDescriptor extends ConnectTableDescriptor<BatchTableDescriptor> { +public class PlannerMock implements Planner { + + @Override + public List<Operation> parse(String statement) { + return null; + } + + @Override + public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) { + return null; + } + + @Override + public String explain(List<Operation> operations, boolean extended) { + return null; + } - public BatchTableDescriptor( - TableEnvironment tableEnv, - ConnectorDescriptor connectorDescriptor) { - super(tableEnv, connectorDescriptor); + @Override + public String[] getCompletionHints(String statement, int position) { + return new String[0]; } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java new file mode 100644 index 0000000..dc8b992 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; + +/** + * Mocking {@link TableEnvironment} for tests. + */ +public class TableEnvironmentMock extends TableEnvironmentImpl { + + public final CatalogManager catalogManager; + + public final ExecutorMock executor; + + public final FunctionCatalog functionCatalog; + + public final PlannerMock planner; + + protected TableEnvironmentMock( + CatalogManager catalogManager, + TableConfig tableConfig, + ExecutorMock executor, + FunctionCatalog functionCatalog, + PlannerMock planner, + boolean isStreamingMode) { + super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode); + + this.catalogManager = catalogManager; + this.executor = executor; + this.functionCatalog = functionCatalog; + this.planner = planner; + } + + public static TableEnvironmentMock getStreamingInstance() { + return getInstance(true); + } + + public static TableEnvironmentMock getBatchInstance() { + return getInstance(false); + } + + private static TableEnvironmentMock getInstance(boolean isStreamingMode) { + final CatalogManager catalogManager = createCatalogManager(); + return new TableEnvironmentMock( + catalogManager, + createTableConfig(), + createExecutor(), + createFunctionCatalog(catalogManager), + createPlanner(), + isStreamingMode); + } + + private static CatalogManager createCatalogManager() { + return new CatalogManager( + EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, + new GenericInMemoryCatalog( + EnvironmentSettings.DEFAULT_BUILTIN_CATALOG, + EnvironmentSettings.DEFAULT_BUILTIN_DATABASE)); + } + + private static TableConfig createTableConfig() { + return TableConfig.getDefault(); + } + + private static ExecutorMock createExecutor() { + return new ExecutorMock(); + } + + private static FunctionCatalog createFunctionCatalog(CatalogManager catalogManager) { + return new FunctionCatalog(catalogManager); + } + + private static PlannerMock createPlanner() { + return new PlannerMock(); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java new file mode 100644 index 0000000..bf93c33 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FormatDescriptorValidator; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Mocking {@link TableSourceFactory} for tests. + */ +public class TableSourceFactoryMock implements TableSourceFactory<Row> { + + public static final String CONNECTOR_TYPE_VALUE = "table-source-factory-mock"; + + @Override + public TableSource<Row> createTableSource(Map<String, String> properties) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(properties); + final TableSchema schema = descriptorProperties.getTableSchema(Schema.SCHEMA); + return new TableSourceMock(schema.toRowDataType(), schema); + } + + @Override + public TableSource<Row> createTableSource(ObjectPath tablePath, CatalogTable table) { + return new TableSourceMock(table.getSchema().toRowDataType(), table.getSchema()); + } + + @Override + public Map<String, String> requiredContext() { + final Map<String, String> context = new HashMap<>(); + context.put(ConnectorDescriptorValidator.CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE); + return context; + } + + @Override + public List<String> supportedProperties() { + final List<String> supportedProperties = new ArrayList<>(); + supportedProperties.add(StreamTableDescriptorValidator.UPDATE_MODE); + supportedProperties.add(ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION); + supportedProperties.add(FormatDescriptorValidator.FORMAT + ".*"); + supportedProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME); + supportedProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_TYPE); + return supportedProperties; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceMock.java similarity index 53% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java copy to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceMock.java index cd0b0e2..25b2d36 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceMock.java @@ -16,20 +16,34 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors; +package org.apache.flink.table.utils; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; /** - * Descriptor for specifying a table source and/or sink in a batch environment. + * Mocking {@link TableSource} for tests. */ -@PublicEvolving -public class BatchTableDescriptor extends ConnectTableDescriptor<BatchTableDescriptor> { +public class TableSourceMock implements TableSource<Row> { - public BatchTableDescriptor( - TableEnvironment tableEnv, - ConnectorDescriptor connectorDescriptor) { - super(tableEnv, connectorDescriptor); + private final DataType producedDataType; + + private final TableSchema tableSchema; + + public TableSourceMock(DataType producedDataType, TableSchema tableSchema) { + this.producedDataType = producedDataType; + this.tableSchema = tableSchema; + } + + @Override + public DataType getProducedDataType() { + return producedDataType; + } + + @Override + public TableSchema getTableSchema() { + return tableSchema; } } diff --git a/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index e2a6a48..1e92d60 100644 --- a/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -15,3 +15,4 @@ org.apache.flink.table.factories.utils.TestPlannerFactory org.apache.flink.table.factories.utils.OtherTestPlannerFactory +org.apache.flink.table.utils.TableSourceFactoryMock diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java index be24af4..41f8e3f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java @@ -18,8 +18,8 @@ package org.apache.flink.table.catalog; +import org.apache.flink.table.descriptors.Descriptor; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.TableDescriptor; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.plan.stats.ColumnStats; import org.apache.flink.table.plan.stats.TableStats; @@ -42,7 +42,7 @@ import static org.apache.flink.table.descriptors.StatisticsValidator.readColumnS * @deprecated use {@link CatalogTable} instead. */ @Deprecated -public class ExternalCatalogTable extends TableDescriptor { +public class ExternalCatalogTable implements Descriptor { /** * Flag whether this external table is intended for batch environments. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/RegistrableDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/RegistrableDescriptor.java deleted file mode 100644 index f0b8242..0000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/RegistrableDescriptor.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * An interface for descriptors that allow to register table source and/or sinks. - */ -@PublicEvolving -public interface RegistrableDescriptor extends Descriptor { - - /** - * Searches for the specified table source, configures it accordingly, and registers it as - * a table under the given name. - * - * @param name table name to be registered in the table environment - */ - void registerTableSource(String name); - - /** - * Searches for the specified table sink, configures it accordingly, and registers it as - * a table under the given name. - * - * @param name table name to be registered in the table environment - */ - void registerTableSink(String name); - - /** - * Searches for the specified table source and sink, configures them accordingly, and registers - * them as a table under the given name. - * - * @param name table name to be registered in the table environment - */ - void registerTableSourceAndSink(String name); -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java index 121e49c..6ae395c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java @@ -19,19 +19,14 @@ package org.apache.flink.table.descriptors; /** - * An interface for descriptors that allow to define a format and schema. + * An interface for descriptors that allow to define a schema. * - * @deprecated use {@link ConnectorFormatDescriptor}. + * @deprecated This class can be dropped once we remove external catalog support. */ @Deprecated public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends Descriptor { /** - * Specifies the format that defines how to read data from a connector. - */ - D withFormat(FormatDescriptor format); - - /** * Specifies the resulting table schema. */ D withSchema(Schema schema); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java deleted file mode 100644 index 0430e8c..0000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; - -/** - * A trait for descriptors that allow to convert between a dynamic table and an external connector. - */ -@PublicEvolving -public interface StreamableDescriptor<D extends StreamableDescriptor<D>> extends Descriptor { - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In append mode, a dynamic table and an external connector only exchange INSERT messages. - * - * @see #inRetractMode() - * @see #inUpsertMode() - */ - D inAppendMode(); - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. - * - * <p>An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an - * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for - * the updating (new) row. - * - * <p>In this mode, a key must not be defined as opposed to upsert mode. However, every update - * consists of two messages which is less efficient. - * - * @see #inAppendMode() - * @see #inUpsertMode() - */ - D inRetractMode(); - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. - * - * <p>This mode requires a (possibly composite) unique key by which updates can be propagated. The - * external connector needs to be aware of the unique key attribute in order to apply messages - * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as - * DELETE messages. - * - * <p>The main difference to a retract stream is that UPDATE changes are encoded with a single - * message and are therefore more efficient. - * - * @see #inAppendMode() - * @see #inRetractMode() - */ - D inUpsertMode(); -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java index 1c12218..0656d33 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java @@ -19,11 +19,144 @@ package org.apache.flink.table.descriptors; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT; /** - * Common class for all descriptors describing table sources and sinks. - */ + * Describes a table consisting of a connector (in a given update mode) and a format. + * + * @param <D> return type for builder pattern + */ @PublicEvolving -public abstract class TableDescriptor extends DescriptorBase { +public abstract class TableDescriptor<D extends TableDescriptor<D>> extends DescriptorBase { + + private final ConnectorDescriptor connectorDescriptor; + + private @Nullable FormatDescriptor formatDescriptor; + + private @Nullable String updateMode; + + protected TableDescriptor(ConnectorDescriptor connectorDescriptor) { + this.connectorDescriptor = Preconditions.checkNotNull(connectorDescriptor, "Connector must not be null."); + } + + /** + * Specifies the format that defines how to read data from a connector. + */ + @SuppressWarnings("unchecked") + public D withFormat(FormatDescriptor format) { + formatDescriptor = Preconditions.checkNotNull(format, "Format must not be null."); + return (D) this; + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * <p>In append mode, a dynamic table and an external connector only exchange INSERT messages. + * + * @see #inRetractMode() + * @see #inUpsertMode() + */ + @SuppressWarnings("unchecked") + public D inAppendMode() { + updateMode = UPDATE_MODE_VALUE_APPEND; + return (D) this; + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * <p>In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. + * + * <p>An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an + * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for + * the updating (new) row. + * + * <p>In this mode, a key must not be defined as opposed to upsert mode. However, every update + * consists of two messages which is less efficient. + * + * @see #inAppendMode() + * @see #inUpsertMode() + */ + @SuppressWarnings("unchecked") + public D inRetractMode() { + updateMode = UPDATE_MODE_VALUE_RETRACT; + return (D) this; + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * <p>In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. + * + * <p>This mode requires a (possibly composite) unique key by which updates can be propagated. The + * external connector needs to be aware of the unique key attribute in order to apply messages + * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as + * DELETE messages. + * + * <p>The main difference to a retract stream is that UPDATE changes are encoded with a single + * message and are therefore more efficient. + * + * @see #inAppendMode() + * @see #inRetractMode() + */ + @SuppressWarnings("unchecked") + public D inUpsertMode() { + updateMode = UPDATE_MODE_VALUE_UPSERT; + return (D) this; + } + + /** + * Converts this descriptor into a set of properties. + */ + @Override + public final Map<String, String> toProperties() { + final DescriptorProperties properties = new DescriptorProperties(); + + // this performs only basic validation + // more validation can only happen within a factory + if (connectorDescriptor.isFormatNeeded() && formatDescriptor == null) { + throw new ValidationException( + String.format( + "The connector %s requires a format description.", + connectorDescriptor.getClass().getName())); + } else if (!connectorDescriptor.isFormatNeeded() && formatDescriptor != null) { + throw new ValidationException( + String.format( + "The connector %s does not require a format description but %s found.", + connectorDescriptor.getClass().getName(), + formatDescriptor.getClass().getName())); + } + + properties.putProperties(connectorDescriptor.toProperties()); + + if (formatDescriptor != null) { + properties.putProperties(formatDescriptor.toProperties()); + } + + if (updateMode != null) { + properties.putString(UPDATE_MODE, updateMode); + } + + properties.putProperties(additionalProperties()); + + return properties.asMap(); + } + /** + * Enables adding more specific properties to {@link #toProperties()}. + */ + protected Map<String, String> additionalProperties() { + return Collections.emptyMap(); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java deleted file mode 100644 index a046a06..0000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.factories.TableFactoryUtil; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.util.Preconditions; - -import java.util.Map; -import java.util.Optional; - -/** - * Common class for table's created with {@link TableEnvironment#connect(ConnectorDescriptor connectorDescriptor)}. - */ -@PublicEvolving -public abstract class ConnectTableDescriptor<D extends ConnectTableDescriptor<D>> - extends TableDescriptor - implements SchematicDescriptor<D>, RegistrableDescriptor { - - private Optional<FormatDescriptor> formatDescriptor = Optional.empty(); - private Optional<Schema> schemaDescriptor = Optional.empty(); - - private final TableEnvironment tableEnv; - private final ConnectorDescriptor connectorDescriptor; - - public ConnectTableDescriptor(TableEnvironment tableEnv, ConnectorDescriptor connectorDescriptor) { - this.tableEnv = tableEnv; - this.connectorDescriptor = connectorDescriptor; - } - - /** - * Searches for the specified table source, configures it accordingly, and registers it as - * a table under the given name. - * - * @param name table name to be registered in the table environment - */ - @Override - public void registerTableSource(String name) { - Preconditions.checkNotNull(name); - TableSource<?> tableSource = TableFactoryUtil.findAndCreateTableSource(this); - tableEnv.registerTableSource(name, tableSource); - } - - /** - * Searches for the specified table sink, configures it accordingly, and registers it as - * a table under the given name. - * - * @param name table name to be registered in the table environment - */ - @Override - public void registerTableSink(String name) { - Preconditions.checkNotNull(name); - TableSink<?> tableSink = TableFactoryUtil.findAndCreateTableSink(this); - tableEnv.registerTableSink(name, tableSink); - } - - /** - * Searches for the specified table source and sink, configures them accordingly, and registers - * them as a table under the given name. - * - * @param name table name to be registered in the table environment - */ - @Override - public void registerTableSourceAndSink(String name) { - registerTableSource(name); - registerTableSink(name); - } - - /** - * Specifies the format that defines how to read data from a connector. - */ - @Override - public D withFormat(FormatDescriptor format) { - formatDescriptor = Optional.of(Preconditions.checkNotNull(format)); - return (D) this; - } - - /** - * Specifies the resulting table schema. - */ - @Override - public D withSchema(Schema schema) { - schemaDescriptor = Optional.of(Preconditions.checkNotNull(schema)); - return (D) this; - } - - /** - * Converts this descriptor into a set of properties. - */ - @Override - public Map<String, String> toProperties() { - DescriptorProperties properties = new DescriptorProperties(); - - // this performs only basic validation - // more validation can only happen within a factory - if (connectorDescriptor.isFormatNeeded() && !formatDescriptor.isPresent()) { - throw new ValidationException(String.format("The connector %s requires a format description.", connectorDescriptor.toString())); - } else if (!connectorDescriptor.isFormatNeeded() && formatDescriptor.isPresent()) { - throw new ValidationException( - String.format("The connector %s does not require a format description " + - "but %s found.", connectorDescriptor.toString(), formatDescriptor.get().toString())); - } - - properties.putProperties(connectorDescriptor.toProperties()); - - formatDescriptor.ifPresent(s -> properties.putProperties(s.toProperties())); - schemaDescriptor.ifPresent(s -> properties.putProperties(s.toProperties())); - - return properties.asMap(); - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java deleted file mode 100644 index 456bbff..0000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.TableEnvironment; - -import java.util.Map; -import java.util.Optional; - -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT; - -/** - * Descriptor for specifying a table source and/or sink in a streaming environment. - */ -@PublicEvolving -public class StreamTableDescriptor - extends ConnectTableDescriptor<StreamTableDescriptor> - implements StreamableDescriptor<StreamTableDescriptor> { - - private Optional<String> updateMode = Optional.empty(); - - public StreamTableDescriptor(TableEnvironment tableEnv, ConnectorDescriptor connectorDescriptor) { - super(tableEnv, connectorDescriptor); - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In append mode, a dynamic table and an external connector only exchange INSERT messages. - * - * @see #inRetractMode() - * @see #inUpsertMode() - */ - @Override - public StreamTableDescriptor inAppendMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND); - return this; - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. - * - * <p>An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an - * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for - * the updating (new) row. - * - * <p>In this mode, a key must not be defined as opposed to upsert mode. However, every update - * consists of two messages which is less efficient. - * - * @see #inAppendMode() - * @see #inUpsertMode() - */ - @Override - public StreamTableDescriptor inRetractMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT); - return this; - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * <p>In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. - * - * <p>This mode requires a (possibly composite) unique key by which updates can be propagated. The - * external connector needs to be aware of the unique key attribute in order to apply messages - * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as - * DELETE messages. - * - * <p>The main difference to a retract stream is that UPDATE changes are encoded with a single - * message and are therefore more efficient. - * - * @see #inAppendMode() - * @see #inRetractMode() - */ - @Override - public StreamTableDescriptor inUpsertMode() { - updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT); - return this; - } - - @Override - public Map<String, String> toProperties() { - DescriptorProperties properties = new DescriptorProperties(); - - properties.putProperties(super.toProperties()); - updateMode.ifPresent(mode -> properties.putString(UPDATE_MODE, mode)); - - return properties.asMap(); - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala index 52bfa06..f1ccc11 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.catalog import java.util -import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ /** @@ -57,27 +56,16 @@ import org.apache.flink.table.descriptors._ */ @Deprecated @deprecated -class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDescriptor) - extends TableDescriptor - with SchematicDescriptor[ExternalCatalogTableBuilder] - with StreamableDescriptor[ExternalCatalogTableBuilder] { +class ExternalCatalogTableBuilder(connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor[ExternalCatalogTableBuilder](connectorDescriptor) + with SchematicDescriptor[ExternalCatalogTableBuilder] { private var isBatch: Boolean = true private var isStreaming: Boolean = true - private var formatDescriptor: Option[FormatDescriptor] = None private var schemaDescriptor: Option[Schema] = None private var statisticsDescriptor: Option[Statistics] = None private var metadataDescriptor: Option[Metadata] = None - private var updateMode: Option[String] = None - - /** - * Specifies the format that defines how to read data from a connector. - */ - override def withFormat(format: FormatDescriptor): ExternalCatalogTableBuilder = { - formatDescriptor = Some(format) - this - } /** * Specifies the resulting table schema. @@ -88,57 +76,6 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc } /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * In append mode, a dynamic table and an external connector only exchange INSERT messages. - * - * @see See also [[inRetractMode()]] and [[inUpsertMode()]]. - */ - override def inAppendMode(): ExternalCatalogTableBuilder = { - updateMode = Some(UPDATE_MODE_VALUE_APPEND) - this - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. - * - * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an - * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for - * the updating (new) row. - * - * In this mode, a key must not be defined as opposed to upsert mode. However, every update - * consists of two messages which is less efficient. - * - * @see See also [[inAppendMode()]] and [[inUpsertMode()]]. - */ - override def inRetractMode(): ExternalCatalogTableBuilder = { - updateMode = Some(UPDATE_MODE_VALUE_RETRACT) - this - } - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. - * - * This mode requires a (possibly composite) unique key by which updates can be propagated. The - * external connector needs to be aware of the unique key attribute in order to apply messages - * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as - * DELETE messages. - * - * The main difference to a retract stream is that UPDATE changes are encoded with a single - * message and are therefore more efficient. - * - * @see See also [[inAppendMode()]] and [[inRetractMode()]]. - */ - override def inUpsertMode(): ExternalCatalogTableBuilder = { - updateMode = Some(UPDATE_MODE_VALUE_UPSERT) - this - } - - /** * Specifies the statistics for this external table. */ def withStatistics(statistics: Statistics): ExternalCatalogTableBuilder = { @@ -228,17 +165,11 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc // ---------------------------------------------------------------------------------------------- - /** - * Converts this descriptor into a set of properties. - */ - override def toProperties: util.Map[String, String] = { + override protected def additionalProperties(): util.Map[String, String] = { val properties = new DescriptorProperties() - properties.putProperties(connectorDescriptor.toProperties) - formatDescriptor.foreach(d => properties.putProperties(d.toProperties)) schemaDescriptor.foreach(d => properties.putProperties(d.toProperties)) statisticsDescriptor.foreach(d => properties.putProperties(d.toProperties)) metadataDescriptor.foreach(d => properties.putProperties(d.toProperties)) - updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) properties.asMap() } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala index eaeb582..806106e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -68,7 +68,7 @@ class TableDescriptorTest extends TableTestBase { .field("myfield6", Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person]))) .fieldDelimiter("#") - val descriptor: RegistrableDescriptor = if (isStreaming) { + val descriptor = if (isStreaming) { streamTestUtil().tableEnv .connect(connector) .withFormat(format) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala index cb84953..d1f48eb 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala @@ -19,51 +19,23 @@ package org.apache.flink.table.descriptors import java.util - -import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} +import java.util.Collections class TestTableDescriptor(connector: ConnectorDescriptor) - extends TableDescriptor - with SchematicDescriptor[TestTableDescriptor] - with StreamableDescriptor[TestTableDescriptor] { + extends TableDescriptor[TestTableDescriptor](connector) + with SchematicDescriptor[TestTableDescriptor] { - private var formatDescriptor: Option[FormatDescriptor] = None private var schemaDescriptor: Option[Schema] = None - private var updateMode: Option[String] = None - - override def toProperties: util.Map[String, String] = { - val properties = new DescriptorProperties() - - properties.putProperties(connector.toProperties) - formatDescriptor.foreach(d => properties.putProperties(d.toProperties)) - schemaDescriptor.foreach(d => properties.putProperties(d.toProperties)) - updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) - - properties.asMap() - } - - override def withFormat(format: FormatDescriptor): TestTableDescriptor = { - this.formatDescriptor = Some(format) - this - } override def withSchema(schema: Schema): TestTableDescriptor = { this.schemaDescriptor = Some(schema) this } - override def inAppendMode(): TestTableDescriptor = { - updateMode = Some(UPDATE_MODE_VALUE_APPEND) - this - } - - override def inRetractMode(): TestTableDescriptor = { - updateMode = Some(UPDATE_MODE_VALUE_RETRACT) - this - } - - override def inUpsertMode(): TestTableDescriptor = { - updateMode = Some(UPDATE_MODE_VALUE_UPSERT) - this + override protected def additionalProperties(): util.Map[String, String] = { + schemaDescriptor match { + case Some(d) => d.toProperties + case None => Collections.emptyMap() + } } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index cb4567f..cb16a7e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -22,9 +22,9 @@ import java.util.Optional import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment} +import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} import org.apache.flink.table.catalog.{Catalog, ExternalCatalog} -import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor} +import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource @@ -52,7 +52,7 @@ class MockTableEnvironment extends TableEnvironment { override def scan(tablePath: String*): Table = ??? - override def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor = ??? + override def connect(connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor = ??? override def listCatalogs(): Array[String] = ???