Hi Jingsong,

The 1st and 2nd pain points you described are very valid, as I'm more
familiar with them. I agree these are shortcomings of the current Flink SQL
design.

A couple comments on your 1st proposal:

1. is it better to have explicit APIs like "createBatchTableSource(...)"
and "createStreamingTableSource(...)" in TableSourceFactory (would be
similar for sink factory) to let planner handle which mode (streaming vs
batch) of source should be instantiated? That way we don't need to always
let connector developers handling an if-else on isStreamingMode.
2. I'm not sure of the benefits to have a CatalogTableContext class. The
path, table, and config are fairly independent of each other. So why not
pass the config in as 3rd parameter as `createXxxTableSource(path,
catalogTable, tableConfig)?


On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi dev,
>
> I'd like to kick off a discussion on the improvement of TableSourceFactory
> and TableSinkFactory.
>
> Motivation:
> Now the main needs and problems are:
> 1.Connector can't get TableConfig [1], and some behaviors really need to be
> controlled by the user's table configuration. In the era of catalog, we
> can't put these config in connector properties, which is too inconvenient.
> 2.Connector can't know if this is batch or stream execution mode. But the
> sink implementation of batch and stream is totally different. I understand
> there is an update mode property now, but it splits the batch and stream in
> the catalog dimension. In fact, this information can be obtained through
> the current TableEnvironment.
> 3.No interface to call validation. Now our validation is more util classes.
> It depends on whether or not the connector calls. Now we have some new
> validations to add, such as [2], which is really confuse uses, even
> developers. Another problem is that our SQL update (DDL) does not have
> validation [3]. It is better to report an error when executing DDL,
> otherwise it will confuse the user.
>
> Proposed change draft for 1 and 2:
>
> interface CatalogTableContext {
>    ObjectPath getTablePath();
>    CatalogTable getTable();
>    ReadableConfig getTableConfig();
>    boolean isStreamingMode();
> }
>
> public interface TableSourceFactory<T> extends TableFactory {
>
>    default TableSource<T> createTableSource(CatalogTableContext context) {
>       return createTableSource(context.getTablePath(), context.getTable());
>    }
>
>    ......
> }
>
> Proposed change draft for 3:
>
> public interface TableFactory {
>
>    TableValidators validators();
>
>    interface TableValidators {
>       ConnectorDescriptorValidator connectorValidator();
>       TableSchemaValidator schemaValidator();
>       FormatDescriptorValidator formatValidator();
>    }
> }
>
> What do you think?
>
> [1] https://issues.apache.org/jira/browse/FLINK-15290
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> [3] https://issues.apache.org/jira/browse/FLINK-15509
>
> Best,
> Jingsong Lee
>

Reply via email to