This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa51c8fc7ff70dfeca8bb2e1aa10b00d9a3a3698 Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Wed Sep 29 10:47:10 2021 +0200 [FLINK-24388][table-planner] Introduce Module#getTableSinkFactory This closes #17384. --- .../connectors/hive/HiveDynamicTableFactory.java | 4 +- .../hive/HiveDynamicTableFactoryTest.java | 6 ++- .../connectors/hive/HiveTableFactoryTest.java | 6 ++- .../apache/flink/table/factories/FactoryUtil.java | 43 ++++++++++++++++++++-- .../java/org/apache/flink/table/module/Module.java | 22 +++++++++++ .../flink/table/factories/utils/FactoryMocks.java | 2 +- .../plan/nodes/exec/batch/BatchExecSink.java | 5 +-- .../plan/nodes/exec/common/CommonExecSink.java | 32 +++++++++++----- .../plan/nodes/exec/spec/DynamicTableSinkSpec.java | 15 ++++++-- .../plan/nodes/exec/stream/StreamExecSink.java | 10 ++--- .../table/planner/delegation/PlannerBase.scala | 30 ++++++++++----- .../exec/serde/DynamicTableSinkSpecSerdeTest.java | 9 ++++- .../runtime/stream/module/ModuleITCase.java | 43 ++++++++++++++++++++++ 13 files changed, 183 insertions(+), 44 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java index 8a6e028..763c9cb 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java @@ -70,8 +70,8 @@ public class HiveDynamicTableFactory implements DynamicTableSourceFactory, Dynam // we don't support temporary hive tables yet if (!isHiveTable || context.isTemporary()) { - return FactoryUtil.createTableSink( - null, // we already in the factory of catalog + return FactoryUtil.createDynamicTableSink( + null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java index 91b6c05..f7d9dae 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java @@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.filesystem.FileSystemConnectorOptions.PartitionOrder; @@ -299,8 +300,9 @@ public class HiveDynamicTableFactoryTest { ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName); CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath()); - return FactoryUtil.createTableSink( - hiveCatalog, + return FactoryUtil.createDynamicTableSink( + (DynamicTableSinkFactory) + hiveCatalog.getFactory().orElseThrow(IllegalStateException::new), tableIdentifier, tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable), tableEnv.getConfig().getConfiguration(), diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java index a794048..506dafb 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java @@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableFactory; @@ -137,8 +138,9 @@ public class HiveTableFactoryTest { assertTrue(tableSource instanceof HiveTableSource); final DynamicTableSink tableSink = - FactoryUtil.createTableSink( - catalog, + FactoryUtil.createDynamicTableSink( + (DynamicTableSinkFactory) + catalog.getFactory().orElseThrow(IllegalStateException::new), ObjectIdentifier.of("mycatalog", "mydb", "mytable"), new ResolvedCatalogTable(table, schema), new Configuration(), diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index b7fdd31..f49144f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -194,10 +194,12 @@ public final class FactoryUtil { /** * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. * - * <p>It considers {@link Catalog#getFactory()} if provided. + * <p>If {@param preferredFactory} is passed, the table sink is created from that factory. + * Otherwise, an attempt is made to discover a matching factory using Java SPI (see {@link + * Factory} for details). */ - public static DynamicTableSink createTableSink( - @Nullable Catalog catalog, + public static DynamicTableSink createDynamicTableSink( + @Nullable DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, @@ -206,9 +208,13 @@ public final class FactoryUtil { final DefaultDynamicTableContext context = new DefaultDynamicTableContext( objectIdentifier, catalogTable, configuration, classLoader, isTemporary); + try { final DynamicTableSinkFactory factory = - getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context); + preferredFactory != null + ? preferredFactory + : discoverTableFactory(DynamicTableSinkFactory.class, context); + return factory.createDynamicTableSink(context); } catch (Throwable t) { throw new ValidationException( @@ -226,6 +232,35 @@ public final class FactoryUtil { } /** + * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. + * + * <p>It considers {@link Catalog#getFactory()} if provided. + * + * @deprecated Use {@link #createDynamicTableSink(DynamicTableSinkFactory, ObjectIdentifier, + * ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)} instead. + */ + @Deprecated + public static DynamicTableSink createTableSink( + @Nullable Catalog catalog, + ObjectIdentifier objectIdentifier, + ResolvedCatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader, + boolean isTemporary) { + final DefaultDynamicTableContext context = + new DefaultDynamicTableContext( + objectIdentifier, catalogTable, configuration, classLoader, isTemporary); + + return createDynamicTableSink( + getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context), + objectIdentifier, + catalogTable, + configuration, + classLoader, + isTemporary); + } + + /** * Creates a utility that helps validating options for a {@link CatalogFactory}. * * <p>Note: This utility checks for left-over options in the final step. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java index 77278bb..6d578cc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java @@ -19,6 +19,7 @@ package org.apache.flink.table.module; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.functions.FunctionDefinition; @@ -74,5 +75,26 @@ public interface Module { return Optional.empty(); } + /** + * Returns a {@link DynamicTableSinkFactory} for creating sink tables. + * + * <p>A factory is determined with the following precedence rule: + * + * <ul> + * <li>1. Factory provided by the corresponding catalog of a persisted table. + * <li>2. Factory provided by a module. + * <li>3. Factory discovered using Java SPI. + * </ul> + * + * <p>This will be called on loaded modules in the order in which they have been loaded. The + * first factory returned will be used. + * + * <p>This method can be useful to disable Java SPI completely or influence how temporary table + * sinks should be created without a corresponding catalog. + */ + default Optional<DynamicTableSinkFactory> getTableSinkFactory() { + return Optional.empty(); + } + // user defined types, operators, rules, etc } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java index 9e34d79..dee33bc 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java @@ -77,7 +77,7 @@ public final class FactoryMocks { public static DynamicTableSink createTableSink( ResolvedSchema schema, List<String> partitionKeys, Map<String, String> options) { - return FactoryUtil.createTableSink( + return FactoryUtil.createDynamicTableSink( null, IDENTIFIER, new ResolvedCatalogTable( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index ae9b703..3633628 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -43,7 +43,7 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec String description) { super( tableSinkSpec, - tableSinkSpec.getTableSink().getChangelogMode(ChangelogMode.insertOnly()), + ChangelogMode.insertOnly(), true, // isBounded getNewNodeId(), Collections.singletonList(inputProperty), @@ -56,7 +56,6 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec protected Transformation<Object> translateToPlanInternal(PlannerBase planner) { final Transformation<RowData> inputTransform = (Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner); - return createSinkTransformation( - planner.getExecEnv(), planner.getTableConfig(), inputTransform, -1, false); + return createSinkTransformation(planner, inputTransform, -1, false); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index b0f3490..5705c97 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -48,6 +48,7 @@ import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator; import org.apache.flink.table.planner.connectors.TransformationSinkProvider; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; @@ -86,12 +87,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> @JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK) protected final DynamicTableSinkSpec tableSinkSpec; - @JsonIgnore private final ChangelogMode changelogMode; + @JsonIgnore private final ChangelogMode inputChangelogMode; @JsonIgnore private final boolean isBounded; protected CommonExecSink( DynamicTableSinkSpec tableSinkSpec, - ChangelogMode changelogMode, + ChangelogMode inputChangelogMode, boolean isBounded, int id, List<InputProperty> inputProperties, @@ -99,7 +100,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> String description) { super(id, inputProperties, outputType, description); this.tableSinkSpec = tableSinkSpec; - this.changelogMode = changelogMode; + this.inputChangelogMode = inputChangelogMode; this.isBounded = isBounded; } @@ -109,12 +110,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> @SuppressWarnings("unchecked") protected Transformation<Object> createSinkTransformation( - StreamExecutionEnvironment env, - TableConfig tableConfig, + PlannerBase planner, Transformation<RowData> inputTransform, int rowtimeFieldIndex, boolean upsertMaterialize) { - final DynamicTableSink tableSink = tableSinkSpec.getTableSink(); + final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner); + final ChangelogMode changelogMode = tableSink.getChangelogMode(inputChangelogMode); final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema(); final SinkRuntimeProvider runtimeProvider = @@ -127,9 +128,15 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider); Transformation<RowData> sinkTransform = - applyNotNullEnforcer(inputTransform, tableConfig, physicalRowType); + applyNotNullEnforcer(inputTransform, planner.getTableConfig(), physicalRowType); - sinkTransform = applyKeyBy(sinkTransform, primaryKeys, sinkParallelism, upsertMaterialize); + sinkTransform = + applyKeyBy( + changelogMode, + sinkTransform, + primaryKeys, + sinkParallelism, + upsertMaterialize); if (upsertMaterialize) { sinkTransform = @@ -137,13 +144,17 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> sinkTransform, primaryKeys, sinkParallelism, - tableConfig, + planner.getTableConfig(), physicalRowType); } return (Transformation<Object>) applySinkProvider( - sinkTransform, env, runtimeProvider, rowtimeFieldIndex, sinkParallelism); + sinkTransform, + planner.getExecEnv(), + runtimeProvider, + rowtimeFieldIndex, + sinkParallelism); } /** @@ -220,6 +231,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> * messages. */ private Transformation<RowData> applyKeyBy( + ChangelogMode changelogMode, Transformation<RowData> inputTransform, int[] primaryKeys, int sinkParallelism, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java index 4e11544..2c522eb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java @@ -21,7 +21,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.spec; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.module.Module; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -59,11 +62,17 @@ public class DynamicTableSinkSpec extends CatalogTableSpecBase { this.sinkAbilitySpecs = sinkAbilitySpecs; } - public DynamicTableSink getTableSink() { + public DynamicTableSink getTableSink(PlannerBase planner) { if (tableSink == null) { + final DynamicTableSinkFactory factory = + planner.getFlinkContext() + .getModuleManager() + .getFactory(Module::getTableSinkFactory) + .orElse(null); + tableSink = - FactoryUtil.createTableSink( - null, // catalog, TODO support create Factory from catalog + FactoryUtil.createDynamicTableSink( + factory, objectIdentifier, catalogTable, configuration, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index f57eacf..c145b59 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -75,7 +75,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj String description) { super( tableSinkSpec, - tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode), + inputChangelogMode, false, // isBounded getNewNodeId(), Collections.singletonList(inputProperty), @@ -96,7 +96,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { super( tableSinkSpec, - tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode), + inputChangelogMode, false, // isBounded id, inputProperties, @@ -138,10 +138,6 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj } return createSinkTransformation( - planner.getExecEnv(), - planner.getTableConfig(), - inputTransform, - rowtimeFieldIndex, - upsertMaterialize); + planner, inputTransform, rowtimeFieldIndex, upsertMaterialize); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 251a1f1..ee038f5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -28,8 +28,8 @@ import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.delegation.{Executor, Parser, Planner} import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, DescriptorProperties} -import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil} -import org.apache.flink.table.module.ModuleManager +import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil} +import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ import org.apache.flink.table.planner.JMap @@ -51,7 +51,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle import org.apache.flink.table.planner.sinks.DataStreamTableSink import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink} import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME} -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter @@ -357,7 +357,7 @@ abstract class PlannerBase( dynamicOptions: JMap[String, String]) : Option[(ResolvedCatalogTable, Any)] = { val optionalLookupResult = - JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier)) + toScala(catalogManager.getTable(objectIdentifier)) if (optionalLookupResult.isEmpty) { return None } @@ -365,7 +365,7 @@ abstract class PlannerBase( lookupResult.getTable match { case connectorTable: ConnectorCatalogTable[_, _] => val resolvedTable = lookupResult.getResolvedTable.asInstanceOf[ResolvedCatalogTable] - JavaScalaConversionUtil.toScala(connectorTable.getTableSink) match { + toScala(connectorTable.getTableSink) match { case Some(sink) => Some(resolvedTable, sink) case None => None } @@ -377,11 +377,11 @@ abstract class PlannerBase( } else { resolvedTable } - val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName) + val catalog = toScala(catalogManager.getCatalog(objectIdentifier.getCatalogName)) val isTemporary = lookupResult.isTemporary if (isLegacyConnectorOptions(objectIdentifier, resolvedTable.getOrigin, isTemporary)) { val tableSink = TableFactoryUtil.findAndCreateTableSink( - catalog.orElse(null), + catalog.orNull, objectIdentifier, tableToFind.getOrigin, getTableConfig.getConfiguration, @@ -389,8 +389,20 @@ abstract class PlannerBase( isTemporary) Option(resolvedTable, tableSink) } else { - val tableSink = FactoryUtil.createTableSink( - catalog.orElse(null), + val factoryFromCatalog = catalog.flatMap(f => toScala(f.getFactory)) match { + case Some(f: DynamicTableSinkFactory) => Some(f) + case _ => None + } + + val factoryFromModule = toScala(plannerContext.getFlinkContext.getModuleManager + .getFactory(toJava((m: Module) => m.getTableSinkFactory))) + + // Since the catalog is more specific, we give it precedence over a factory provided by + // any modules. + val factory = factoryFromCatalog.orElse(factoryFromModule).orNull + + val tableSink = FactoryUtil.createDynamicTableSink( + factory, objectIdentifier, tableToFind, getTableConfig.getConfiguration, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java index 59e390d..9882457 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java @@ -20,7 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -29,6 +32,7 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec; import org.apache.flink.table.planner.plan.abilities.sink.PartitioningSpec; @@ -91,7 +95,10 @@ public class DynamicTableSinkSpecSerdeTest { actual.setClassLoader(classLoader); assertNull(actual.getReadableConfig()); actual.setReadableConfig(serdeCtx.getConfiguration()); - assertNotNull(actual.getTableSink()); + TableEnvironmentImpl tableEnv = + (TableEnvironmentImpl) + TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + assertNotNull(actual.getTableSink((PlannerBase) tableEnv.getPlanner())); } @Parameterized.Parameters(name = "{0}") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java index a92da1a..bdd8f63 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java @@ -19,9 +19,12 @@ package org.apache.flink.table.planner.runtime.stream.module; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; +import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.module.Module; @@ -68,6 +71,34 @@ public class ModuleITCase extends StreamingTestBase { table.explain(); } + @Test + public void testTableSinkFactory() { + tEnv().createTemporaryTable( + "T", + TableFactoryHarness.newBuilder() + .schema(Schema.newBuilder().column("f0", DataTypes.INT()).build()) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public SinkRuntimeProvider getSinkRuntimeProvider( + Context context) { + throw new UnsupportedOperationException( + "Discovered factory should not be used"); + } + }) + .build()); + + // Sanity check: without our module loaded, the factory discovery process is used. + assertThrows( + "Discovered factory should not be used", + UnsupportedOperationException.class, + () -> tEnv().explainSql("INSERT INTO T SELECT 1")); + + // The module has precedence over factory discovery. + tEnv().loadModule("M", new SourceSinkFactoryOverwriteModule()); + tEnv().explainSql("INSERT INTO T SELECT 1"); + } + // --------------------------------------------------------------------------------------------- private static class SourceSinkFactoryOverwriteModule implements Module { @@ -75,6 +106,11 @@ public class ModuleITCase extends StreamingTestBase { public Optional<DynamicTableSourceFactory> getTableSourceFactory() { return Optional.of(new SourceFactory()); } + + @Override + public Optional<DynamicTableSinkFactory> getTableSinkFactory() { + return Optional.of(new SinkFactory()); + } } private static class SourceFactory extends FactoryBase implements DynamicTableSourceFactory { @@ -84,6 +120,13 @@ public class ModuleITCase extends StreamingTestBase { } } + private static class SinkFactory extends FactoryBase implements DynamicTableSinkFactory { + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + return new TableFactoryHarness.SinkBase() {}; + } + } + private static class FactoryBase implements Factory { @Override public String factoryIdentifier() {