This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa51c8fc7ff70dfeca8bb2e1aa10b00d9a3a3698
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Wed Sep 29 10:47:10 2021 +0200

    [FLINK-24388][table-planner] Introduce Module#getTableSinkFactory
    
    This closes #17384.
---
 .../connectors/hive/HiveDynamicTableFactory.java   |  4 +-
 .../hive/HiveDynamicTableFactoryTest.java          |  6 ++-
 .../connectors/hive/HiveTableFactoryTest.java      |  6 ++-
 .../apache/flink/table/factories/FactoryUtil.java  | 43 ++++++++++++++++++++--
 .../java/org/apache/flink/table/module/Module.java | 22 +++++++++++
 .../flink/table/factories/utils/FactoryMocks.java  |  2 +-
 .../plan/nodes/exec/batch/BatchExecSink.java       |  5 +--
 .../plan/nodes/exec/common/CommonExecSink.java     | 32 +++++++++++-----
 .../plan/nodes/exec/spec/DynamicTableSinkSpec.java | 15 ++++++--
 .../plan/nodes/exec/stream/StreamExecSink.java     | 10 ++---
 .../table/planner/delegation/PlannerBase.scala     | 30 ++++++++++-----
 .../exec/serde/DynamicTableSinkSpecSerdeTest.java  |  9 ++++-
 .../runtime/stream/module/ModuleITCase.java        | 43 ++++++++++++++++++++++
 13 files changed, 183 insertions(+), 44 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
index 8a6e028..763c9cb 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -70,8 +70,8 @@ public class HiveDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
 
         // we don't support temporary hive tables yet
         if (!isHiveTable || context.isTemporary()) {
-            return FactoryUtil.createTableSink(
-                    null, // we already in the factory of catalog
+            return FactoryUtil.createDynamicTableSink(
+                    null,
                     context.getObjectIdentifier(),
                     context.getCatalogTable(),
                     context.getConfiguration(),
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
index 91b6c05..f7d9dae 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import 
org.apache.flink.table.filesystem.FileSystemConnectorOptions.PartitionOrder;
@@ -299,8 +300,9 @@ public class HiveDynamicTableFactoryTest {
                 ObjectIdentifier.of(hiveCatalog.getName(), "default", 
tableName);
         CatalogTable catalogTable =
                 (CatalogTable) 
hiveCatalog.getTable(tableIdentifier.toObjectPath());
-        return FactoryUtil.createTableSink(
-                hiveCatalog,
+        return FactoryUtil.createDynamicTableSink(
+                (DynamicTableSinkFactory)
+                        
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
                 tableIdentifier,
                 
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
                 tableEnv.getConfig().getConfiguration(),
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
index a794048..506dafb 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.TableFactory;
@@ -137,8 +138,9 @@ public class HiveTableFactoryTest {
         assertTrue(tableSource instanceof HiveTableSource);
 
         final DynamicTableSink tableSink =
-                FactoryUtil.createTableSink(
-                        catalog,
+                FactoryUtil.createDynamicTableSink(
+                        (DynamicTableSinkFactory)
+                                
catalog.getFactory().orElseThrow(IllegalStateException::new),
                         ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
                         new ResolvedCatalogTable(table, schema),
                         new Configuration(),
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index b7fdd31..f49144f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -194,10 +194,12 @@ public final class FactoryUtil {
     /**
      * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
      *
-     * <p>It considers {@link Catalog#getFactory()} if provided.
+     * <p>If {@param preferredFactory} is passed, the table sink is created 
from that factory.
+     * Otherwise, an attempt is made to discover a matching factory using Java 
SPI (see {@link
+     * Factory} for details).
      */
-    public static DynamicTableSink createTableSink(
-            @Nullable Catalog catalog,
+    public static DynamicTableSink createDynamicTableSink(
+            @Nullable DynamicTableSinkFactory preferredFactory,
             ObjectIdentifier objectIdentifier,
             ResolvedCatalogTable catalogTable,
             ReadableConfig configuration,
@@ -206,9 +208,13 @@ public final class FactoryUtil {
         final DefaultDynamicTableContext context =
                 new DefaultDynamicTableContext(
                         objectIdentifier, catalogTable, configuration, 
classLoader, isTemporary);
+
         try {
             final DynamicTableSinkFactory factory =
-                    getDynamicTableFactory(DynamicTableSinkFactory.class, 
catalog, context);
+                    preferredFactory != null
+                            ? preferredFactory
+                            : 
discoverTableFactory(DynamicTableSinkFactory.class, context);
+
             return factory.createDynamicTableSink(context);
         } catch (Throwable t) {
             throw new ValidationException(
@@ -226,6 +232,35 @@ public final class FactoryUtil {
     }
 
     /**
+     * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+     *
+     * <p>It considers {@link Catalog#getFactory()} if provided.
+     *
+     * @deprecated Use {@link #createDynamicTableSink(DynamicTableSinkFactory, 
ObjectIdentifier,
+     *     ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)} 
instead.
+     */
+    @Deprecated
+    public static DynamicTableSink createTableSink(
+            @Nullable Catalog catalog,
+            ObjectIdentifier objectIdentifier,
+            ResolvedCatalogTable catalogTable,
+            ReadableConfig configuration,
+            ClassLoader classLoader,
+            boolean isTemporary) {
+        final DefaultDynamicTableContext context =
+                new DefaultDynamicTableContext(
+                        objectIdentifier, catalogTable, configuration, 
classLoader, isTemporary);
+
+        return createDynamicTableSink(
+                getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, 
context),
+                objectIdentifier,
+                catalogTable,
+                configuration,
+                classLoader,
+                isTemporary);
+    }
+
+    /**
      * Creates a utility that helps validating options for a {@link 
CatalogFactory}.
      *
      * <p>Note: This utility checks for left-over options in the final step.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
index 77278bb..6d578cc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.module;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
 
@@ -74,5 +75,26 @@ public interface Module {
         return Optional.empty();
     }
 
+    /**
+     * Returns a {@link DynamicTableSinkFactory} for creating sink tables.
+     *
+     * <p>A factory is determined with the following precedence rule:
+     *
+     * <ul>
+     *   <li>1. Factory provided by the corresponding catalog of a persisted 
table.
+     *   <li>2. Factory provided by a module.
+     *   <li>3. Factory discovered using Java SPI.
+     * </ul>
+     *
+     * <p>This will be called on loaded modules in the order in which they 
have been loaded. The
+     * first factory returned will be used.
+     *
+     * <p>This method can be useful to disable Java SPI completely or 
influence how temporary table
+     * sinks should be created without a corresponding catalog.
+     */
+    default Optional<DynamicTableSinkFactory> getTableSinkFactory() {
+        return Optional.empty();
+    }
+
     // user defined types, operators, rules, etc
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
index 9e34d79..dee33bc 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/utils/FactoryMocks.java
@@ -77,7 +77,7 @@ public final class FactoryMocks {
 
     public static DynamicTableSink createTableSink(
             ResolvedSchema schema, List<String> partitionKeys, Map<String, 
String> options) {
-        return FactoryUtil.createTableSink(
+        return FactoryUtil.createDynamicTableSink(
                 null,
                 IDENTIFIER,
                 new ResolvedCatalogTable(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index ae9b703..3633628 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -43,7 +43,7 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
             String description) {
         super(
                 tableSinkSpec,
-                
tableSinkSpec.getTableSink().getChangelogMode(ChangelogMode.insertOnly()),
+                ChangelogMode.insertOnly(),
                 true, // isBounded
                 getNewNodeId(),
                 Collections.singletonList(inputProperty),
@@ -56,7 +56,6 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
     protected Transformation<Object> translateToPlanInternal(PlannerBase 
planner) {
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) 
getInputEdges().get(0).translateToPlan(planner);
-        return createSinkTransformation(
-                planner.getExecEnv(), planner.getTableConfig(), 
inputTransform, -1, false);
+        return createSinkTransformation(planner, inputTransform, -1, false);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index b0f3490..5705c97 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
+import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
@@ -86,12 +87,12 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     @JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK)
     protected final DynamicTableSinkSpec tableSinkSpec;
 
-    @JsonIgnore private final ChangelogMode changelogMode;
+    @JsonIgnore private final ChangelogMode inputChangelogMode;
     @JsonIgnore private final boolean isBounded;
 
     protected CommonExecSink(
             DynamicTableSinkSpec tableSinkSpec,
-            ChangelogMode changelogMode,
+            ChangelogMode inputChangelogMode,
             boolean isBounded,
             int id,
             List<InputProperty> inputProperties,
@@ -99,7 +100,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             String description) {
         super(id, inputProperties, outputType, description);
         this.tableSinkSpec = tableSinkSpec;
-        this.changelogMode = changelogMode;
+        this.inputChangelogMode = inputChangelogMode;
         this.isBounded = isBounded;
     }
 
@@ -109,12 +110,12 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
 
     @SuppressWarnings("unchecked")
     protected Transformation<Object> createSinkTransformation(
-            StreamExecutionEnvironment env,
-            TableConfig tableConfig,
+            PlannerBase planner,
             Transformation<RowData> inputTransform,
             int rowtimeFieldIndex,
             boolean upsertMaterialize) {
-        final DynamicTableSink tableSink = tableSinkSpec.getTableSink();
+        final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner);
+        final ChangelogMode changelogMode = 
tableSink.getChangelogMode(inputChangelogMode);
         final ResolvedSchema schema = 
tableSinkSpec.getCatalogTable().getResolvedSchema();
 
         final SinkRuntimeProvider runtimeProvider =
@@ -127,9 +128,15 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         final int sinkParallelism = deriveSinkParallelism(inputTransform, 
runtimeProvider);
 
         Transformation<RowData> sinkTransform =
-                applyNotNullEnforcer(inputTransform, tableConfig, 
physicalRowType);
+                applyNotNullEnforcer(inputTransform, planner.getTableConfig(), 
physicalRowType);
 
-        sinkTransform = applyKeyBy(sinkTransform, primaryKeys, 
sinkParallelism, upsertMaterialize);
+        sinkTransform =
+                applyKeyBy(
+                        changelogMode,
+                        sinkTransform,
+                        primaryKeys,
+                        sinkParallelism,
+                        upsertMaterialize);
 
         if (upsertMaterialize) {
             sinkTransform =
@@ -137,13 +144,17 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             sinkTransform,
                             primaryKeys,
                             sinkParallelism,
-                            tableConfig,
+                            planner.getTableConfig(),
                             physicalRowType);
         }
 
         return (Transformation<Object>)
                 applySinkProvider(
-                        sinkTransform, env, runtimeProvider, 
rowtimeFieldIndex, sinkParallelism);
+                        sinkTransform,
+                        planner.getExecEnv(),
+                        runtimeProvider,
+                        rowtimeFieldIndex,
+                        sinkParallelism);
     }
 
     /**
@@ -220,6 +231,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
      * messages.
      */
     private Transformation<RowData> applyKeyBy(
+            ChangelogMode changelogMode,
             Transformation<RowData> inputTransform,
             int[] primaryKeys,
             int sinkParallelism,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
index 4e11544..2c522eb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.spec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -59,11 +62,17 @@ public class DynamicTableSinkSpec extends 
CatalogTableSpecBase {
         this.sinkAbilitySpecs = sinkAbilitySpecs;
     }
 
-    public DynamicTableSink getTableSink() {
+    public DynamicTableSink getTableSink(PlannerBase planner) {
         if (tableSink == null) {
+            final DynamicTableSinkFactory factory =
+                    planner.getFlinkContext()
+                            .getModuleManager()
+                            .getFactory(Module::getTableSinkFactory)
+                            .orElse(null);
+
             tableSink =
-                    FactoryUtil.createTableSink(
-                            null, // catalog, TODO support create Factory from 
catalog
+                    FactoryUtil.createDynamicTableSink(
+                            factory,
                             objectIdentifier,
                             catalogTable,
                             configuration,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index f57eacf..c145b59 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -75,7 +75,7 @@ public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Obj
             String description) {
         super(
                 tableSinkSpec,
-                
tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode),
+                inputChangelogMode,
                 false, // isBounded
                 getNewNodeId(),
                 Collections.singletonList(inputProperty),
@@ -96,7 +96,7 @@ public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Obj
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(
                 tableSinkSpec,
-                
tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode),
+                inputChangelogMode,
                 false, // isBounded
                 id,
                 inputProperties,
@@ -138,10 +138,6 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
         }
 
         return createSinkTransformation(
-                planner.getExecEnv(),
-                planner.getTableConfig(),
-                inputTransform,
-                rowtimeFieldIndex,
-                upsertMaterialize);
+                planner, inputTransform, rowtimeFieldIndex, upsertMaterialize);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 251a1f1..ee038f5 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -28,8 +28,8 @@ import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
 import org.apache.flink.table.descriptors.{ConnectorDescriptorValidator, 
DescriptorProperties}
-import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil}
-import org.apache.flink.table.module.ModuleManager
+import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, 
TableFactoryUtil}
+import org.apache.flink.table.module.{Module, ModuleManager}
 import 
org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.planner.JMap
@@ -51,7 +51,7 @@ import 
org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import 
org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, 
validateLogicalPhysicalTypesCompatible, validateTableSink}
 import 
org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME,
 TABLE_QUERY_START_LOCAL_TIME}
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, 
toScala}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 
@@ -357,7 +357,7 @@ abstract class PlannerBase(
       dynamicOptions: JMap[String, String])
     : Option[(ResolvedCatalogTable, Any)] = {
     val optionalLookupResult =
-      
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
+      toScala(catalogManager.getTable(objectIdentifier))
     if (optionalLookupResult.isEmpty) {
       return None
     }
@@ -365,7 +365,7 @@ abstract class PlannerBase(
     lookupResult.getTable match {
       case connectorTable: ConnectorCatalogTable[_, _] =>
         val resolvedTable = 
lookupResult.getResolvedTable.asInstanceOf[ResolvedCatalogTable]
-        JavaScalaConversionUtil.toScala(connectorTable.getTableSink) match {
+        toScala(connectorTable.getTableSink) match {
           case Some(sink) => Some(resolvedTable, sink)
           case None => None
         }
@@ -377,11 +377,11 @@ abstract class PlannerBase(
         } else {
           resolvedTable
         }
-        val catalog = 
catalogManager.getCatalog(objectIdentifier.getCatalogName)
+        val catalog = 
toScala(catalogManager.getCatalog(objectIdentifier.getCatalogName))
         val isTemporary = lookupResult.isTemporary
         if (isLegacyConnectorOptions(objectIdentifier, 
resolvedTable.getOrigin, isTemporary)) {
           val tableSink = TableFactoryUtil.findAndCreateTableSink(
-            catalog.orElse(null),
+            catalog.orNull,
             objectIdentifier,
             tableToFind.getOrigin,
             getTableConfig.getConfiguration,
@@ -389,8 +389,20 @@ abstract class PlannerBase(
             isTemporary)
           Option(resolvedTable, tableSink)
         } else {
-          val tableSink = FactoryUtil.createTableSink(
-            catalog.orElse(null),
+          val factoryFromCatalog = catalog.flatMap(f => toScala(f.getFactory)) 
match {
+            case Some(f: DynamicTableSinkFactory) => Some(f)
+            case _ => None
+          }
+
+          val factoryFromModule = 
toScala(plannerContext.getFlinkContext.getModuleManager
+            .getFactory(toJava((m: Module) => m.getTableSinkFactory)))
+
+          // Since the catalog is more specific, we give it precedence over a 
factory provided by
+          // any modules.
+          val factory = factoryFromCatalog.orElse(factoryFromModule).orNull
+
+          val tableSink = FactoryUtil.createDynamicTableSink(
+            factory,
             objectIdentifier,
             tableToFind,
             getTableConfig.getConfiguration,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
index 59e390d..9882457 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -29,6 +32,7 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.planner.calcite.FlinkContextImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.PartitioningSpec;
@@ -91,7 +95,10 @@ public class DynamicTableSinkSpecSerdeTest {
         actual.setClassLoader(classLoader);
         assertNull(actual.getReadableConfig());
         actual.setReadableConfig(serdeCtx.getConfiguration());
-        assertNotNull(actual.getTableSink());
+        TableEnvironmentImpl tableEnv =
+                (TableEnvironmentImpl)
+                        
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        assertNotNull(actual.getTableSink((PlannerBase) 
tableEnv.getPlanner()));
     }
 
     @Parameterized.Parameters(name = "{0}")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
index a92da1a..bdd8f63 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/module/ModuleITCase.java
@@ -19,9 +19,12 @@
 package org.apache.flink.table.planner.runtime.stream.module;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.module.Module;
@@ -68,6 +71,34 @@ public class ModuleITCase extends StreamingTestBase {
         table.explain();
     }
 
+    @Test
+    public void testTableSinkFactory() {
+        tEnv().createTemporaryTable(
+                        "T",
+                        TableFactoryHarness.newBuilder()
+                                .schema(Schema.newBuilder().column("f0", 
DataTypes.INT()).build())
+                                .sink(
+                                        new TableFactoryHarness.SinkBase() {
+                                            @Override
+                                            public SinkRuntimeProvider 
getSinkRuntimeProvider(
+                                                    Context context) {
+                                                throw new 
UnsupportedOperationException(
+                                                        "Discovered factory 
should not be used");
+                                            }
+                                        })
+                                .build());
+
+        // Sanity check: without our module loaded, the factory discovery 
process is used.
+        assertThrows(
+                "Discovered factory should not be used",
+                UnsupportedOperationException.class,
+                () -> tEnv().explainSql("INSERT INTO T SELECT 1"));
+
+        // The module has precedence over factory discovery.
+        tEnv().loadModule("M", new SourceSinkFactoryOverwriteModule());
+        tEnv().explainSql("INSERT INTO T SELECT 1");
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     private static class SourceSinkFactoryOverwriteModule implements Module {
@@ -75,6 +106,11 @@ public class ModuleITCase extends StreamingTestBase {
         public Optional<DynamicTableSourceFactory> getTableSourceFactory() {
             return Optional.of(new SourceFactory());
         }
+
+        @Override
+        public Optional<DynamicTableSinkFactory> getTableSinkFactory() {
+            return Optional.of(new SinkFactory());
+        }
     }
 
     private static class SourceFactory extends FactoryBase implements 
DynamicTableSourceFactory {
@@ -84,6 +120,13 @@ public class ModuleITCase extends StreamingTestBase {
         }
     }
 
+    private static class SinkFactory extends FactoryBase implements 
DynamicTableSinkFactory {
+        @Override
+        public DynamicTableSink createDynamicTableSink(Context context) {
+            return new TableFactoryHarness.SinkBase() {};
+        }
+    }
+
     private static class FactoryBase implements Factory {
         @Override
         public String factoryIdentifier() {

Reply via email to