This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9260311637ad47a6e67f154c629ddd49d9f262a
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Jan 6 14:52:50 2022 +0100

    [FLINK-25391][connector-files] Forward catalog table options
---
 docs/content/docs/connectors/table/filesystem.md   | 80 +++++++++++++++++-----
 .../file/table/AbstractFileSystemTable.java        | 24 +++----
 .../file/table/FileSystemTableFactory.java         | 29 +++++++-
 .../connector/file/table/FileSystemTableSink.java  | 21 ++++--
 .../file/table/FileSystemTableSource.java          | 25 ++++---
 5 files changed, 131 insertions(+), 48 deletions(-)

diff --git a/docs/content/docs/connectors/table/filesystem.md 
b/docs/content/docs/connectors/table/filesystem.md
index 24dfa11..e2c0aeb 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -208,21 +208,27 @@ a timeout that specifies the maximum duration for which a 
file can be open.
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">128MB</td>
         <td>MemorySize</td>
         <td>The maximum part file size before rolling.</td>
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.rollover-interval</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">30 min</td>
         <td>Duration</td>
         <td>The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
@@ -230,6 +236,8 @@ a timeout that specifies the maximum duration for which a 
file can be open.
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.check-interval</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">1 min</td>
         <td>Duration</td>
         <td>The interval for checking time based rolling policies. This 
controls the frequency to check whether a part file should rollover based on 
'sink.rolling-policy.rollover-interval'.</td>
@@ -250,21 +258,27 @@ The file sink supports file compactions, which allows 
applications to have small
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">false</td>
         <td>Boolean</td>
         <td>Whether to enable automatic compaction in streaming sink or not. 
The data will be written to temporary files. After the checkpoint is completed, 
the temporary files generated by a checkpoint will be compacted. The temporary 
files are invisible before compaction.</td>
     </tr>
     <tr>
         <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>MemorySize</td>
         <td>The compaction target file size, the default value is the rolling 
file size.</td>
@@ -294,27 +308,35 @@ To define when to commit a partition, providing partition 
commit trigger:
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.partition-commit.trigger</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">process-time</td>
         <td>String</td>
         <td>Trigger type for partition commit: 'process-time': based on the 
time of the machine, it neither requires partition time extraction nor 
watermark generation. Commit partition once the 'current system time' passes 
'partition creation system time' plus 'delay'. 'partition-time': based on the 
time that extracted from partition values, it requires watermark generation. 
Commit partition once the 'watermark' passes 'time extracted from partition 
values' plus 'delay'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.delay</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">0 s</td>
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">UTC</td>
         <td>String</td>
         <td>The time zone to parse the long watermark value to TIMESTAMP 
value, the parsed watermark timestamp is used to compare with partition time to 
decide the partition should commit or not. This option is only take effect when 
`sink.partition-commit.trigger` is set to 'partition-time'. If this option is 
not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ 
column, but this config is not configured, then users may see the partition 
committed after a few hours. Th [...]
@@ -356,33 +378,43 @@ Time extractors define extracting time from partition 
values.
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>partition.time-extractor.kind</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">default</td>
         <td>String</td>
         <td>Time extractor to extract time from partition values. Support 
default and custom. For default, can configure timestamp pattern\formatter. For 
custom, should configure extractor class.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.class</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The extractor class for implement PartitionTimeExtractor 
interface.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The 'default' construction way allows users to use partition 
fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd hh:mm:ss' 
from first field. If timestamp should be extracted from a single partition 
field 'dt', can configure: '$dt'. If timestamp should be extracted from 
multiple partition fields, say 'year', 'month', 'day' and 'hour', can 
configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted 
from two partition fields 'dt' and 'hour', can [...]
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-formatter</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
         <td>String</td>
         <td>The formatter that formats the partition timestamp string value to 
timestamp, the partition timestamp string value is expressed by 
'partition.time-extractor.timestamp-pattern'. For example, the partition 
timestamp is extracted from multiple partition fields, say 'year', 'month' and 
'day', you can configure 'partition.time-extractor.timestamp-pattern' to 
'$year$month$day', and configure `partition.time-extractor.timestamp-formatter` 
to 'yyyyMMdd'. By default the formatter is ' [...]
@@ -417,27 +449,35 @@ The partition commit policy defines what action is taken 
when partitions are com
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.partition-commit.policy.kind</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>Policy to commit a partition is to notify the downstream 
application that the partition has finished writing, the partition is ready to 
be read. metastore: add partition to metastore. Only hive table supports 
metastore policy, file system manages partitions through directory structure. 
success-file: add '_success' file to directory. Both can be configured at the 
same time: 'metastore,success-file'. custom: use policy class to create a 
commit policy. Support to configure multi [...]
     </tr>
     <tr>
         <td><h5>sink.partition-commit.policy.class</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The partition commit policy class for implement 
PartitionCommitPolicy interface. Only work in custom commit policy.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.success-file.name</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">_SUCCESS</td>
         <td>String</td>
         <td>The file name for success-file partition commit policy, default is 
'_SUCCESS'.</td>
@@ -482,15 +522,19 @@ The parallelism of writing files into external file 
system (including Hive) can
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>Integer</td>
         <td>Parallelism of writing files into external file system. The value 
should greater than zero otherwise exception will be thrown.</td>
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
index f78bee6..265c04a 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
 
 import java.util.List;
@@ -34,7 +33,6 @@ import java.util.stream.Collectors;
 /** Abstract File system table for providing some common methods. */
 abstract class AbstractFileSystemTable {
 
-    final DynamicTableFactory.Context context;
     final ObjectIdentifier tableIdentifier;
     final Configuration tableOptions;
     final ResolvedSchema schema;
@@ -43,16 +41,18 @@ abstract class AbstractFileSystemTable {
 
     List<String> partitionKeys;
 
-    AbstractFileSystemTable(DynamicTableFactory.Context context) {
-        this.context = context;
-        this.tableIdentifier = context.getObjectIdentifier();
-        this.tableOptions = new Configuration();
-        
context.getCatalogTable().getOptions().forEach(tableOptions::setString);
-        this.schema = context.getCatalogTable().getResolvedSchema();
-        this.path = new 
Path(tableOptions.get(FileSystemConnectorOptions.PATH));
-        this.defaultPartName = 
tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
-
-        this.partitionKeys = context.getCatalogTable().getPartitionKeys();
+    AbstractFileSystemTable(
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions) {
+        this.tableIdentifier = tableIdentifier;
+        this.tableOptions = (Configuration) tableOptions;
+        this.schema = schema;
+        this.path = new 
Path(this.tableOptions.get(FileSystemConnectorOptions.PATH));
+        this.defaultPartName =
+                
this.tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
+        this.partitionKeys = partitionKeys;
     }
 
     ReadableConfig formatOptions(String identifier) {
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index 952c522..86a4ed3 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.time.ZoneId.SHORT_IDS;
 
@@ -70,7 +71,10 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
         return new FileSystemTableSource(
-                context,
+                context.getObjectIdentifier(),
+                context.getCatalogTable().getResolvedSchema(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, 
DeserializationFormatFactory.class),
                 discoverFormatFactory(context));
@@ -81,7 +85,10 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
         return new FileSystemTableSink(
-                context,
+                context.getObjectIdentifier(),
+                context.getCatalogTable().getResolvedSchema(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, 
DeserializationFormatFactory.class),
                 discoverFormatFactory(context),
@@ -122,6 +129,24 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        FileSystemConnectorOptions.PATH,
+                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME,
+                        
FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE,
+                        
FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL,
+                        
FileSystemConnectorOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL,
+                        
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY,
+                        
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE,
+                        
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND,
+                        
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS,
+                        
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME,
+                        FileSystemConnectorOptions.COMPACTION_FILE_SIZE)
+                .collect(Collectors.toSet());
+    }
+
     private void validate(FactoryUtil.TableFactoryHelper helper) {
         // Except format options, some formats like parquet and orc can not 
list all supported
         // options.
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 09705cd..6708b60 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -55,6 +55,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -65,7 +67,6 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -117,20 +118,22 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
     @Nullable private Integer configuredParallelism;
 
     FileSystemTableSink(
-            DynamicTableFactory.Context context,
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> 
bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory,
             @Nullable EncodingFormat<BulkWriter.Factory<RowData>> 
bulkWriterFormat,
             @Nullable EncodingFormat<SerializationSchema<RowData>> 
serializationFormat) {
-        super(context);
+        super(tableIdentifier, schema, partitionKeys, tableOptions);
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
         if (Stream.of(bulkWriterFormat, serializationFormat, formatFactory)
                 .allMatch(Objects::isNull)) {
-            Configuration options = 
Configuration.fromMap(context.getCatalogTable().getOptions());
-            String identifier = options.get(FactoryUtil.FORMAT);
+            String identifier = tableOptions.get(FactoryUtil.FORMAT);
             throw new ValidationException(
                     String.format(
                             "Could not find any format factory for identifier 
'%s' in the classpath.",
@@ -138,7 +141,8 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
         }
         this.bulkWriterFormat = bulkWriterFormat;
         this.serializationFormat = serializationFormat;
-        this.configuredParallelism = 
tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
+        this.configuredParallelism =
+                
this.tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
     }
 
     @Override
@@ -545,7 +549,10 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
     public DynamicTableSink copy() {
         FileSystemTableSink sink =
                 new FileSystemTableSink(
-                        context,
+                        tableIdentifier,
+                        schema,
+                        partitionKeys,
+                        tableOptions,
                         bulkReaderFormat,
                         deserializationFormat,
                         formatFactory,
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 5287f6f..f8cb703 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.FileSourceSplit;
@@ -35,6 +34,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
@@ -52,7 +53,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
@@ -97,15 +97,17 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
     private DataType producedDataType;
 
     public FileSystemTableSource(
-            DynamicTableFactory.Context context,
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> 
bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory) {
-        super(context);
+        super(tableIdentifier, schema, partitionKeys, tableOptions);
         if (Stream.of(bulkReaderFormat, deserializationFormat, formatFactory)
                 .allMatch(Objects::isNull)) {
-            Configuration options = 
Configuration.fromMap(context.getCatalogTable().getOptions());
-            String identifier = options.get(FactoryUtil.FORMAT);
+            String identifier = tableOptions.get(FactoryUtil.FORMAT);
             throw new ValidationException(
                     String.format(
                             "Could not find any format factory for identifier 
'%s' in the classpath.",
@@ -114,8 +116,7 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
-
-        this.producedDataType = context.getPhysicalRowDataType();
+        this.producedDataType = schema.toPhysicalRowDataType();
     }
 
     @Override
@@ -410,7 +411,13 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
     public FileSystemTableSource copy() {
         FileSystemTableSource source =
                 new FileSystemTableSource(
-                        context, bulkReaderFormat, deserializationFormat, 
formatFactory);
+                        tableIdentifier,
+                        schema,
+                        partitionKeys,
+                        tableOptions,
+                        bulkReaderFormat,
+                        deserializationFormat,
+                        formatFactory);
         source.partitionKeys = partitionKeys;
         source.remainingPartitions = remainingPartitions;
         source.filters = filters;

Reply via email to