Hi Becket,Let me clarify a few things first: Historically we thought of Table API/SQL as a library on top of DataStream API. Similar to Gelly or CEP. We used TypeInformation in Table API to integrate nicely with DataStream API. However, the last years have shown that SQL is not just a library. It is an entire ecosystem that defines data types, submission behavior, execution behavior, and highly optimized SerDes. SQL is a way to declare data processing end-to-end such that the planner has the full control over the execution.
But I totally agree with your concerns around connectors. There is no big difference between your concerns and the current design.
1. "native connector interface is a generic abstraction of doing IO and Serde":
This is the case in our design. We are using SourceFunction, DeserializationSchema, WatermarkAssigner, etc. all pluggable interfaces that the DataStream API offers for performing runtime operations.
2. "advanced features ... could be provided in a semantic free way":I agree here. But this is an orthogonal topic that each connector implementer should keep in mind. If a new connector is developed, it should *not* be developed only for SQL in mind but with good abstraction such that also DataStream API users can use it. A connector should have a builder pattern to plugin all capabilities like Parque filters etc. There should be no table-specific native/runtime connectors. I think this discussion is related to the discussion of FLIP-115.
However, as I mentioned before: This FLIP only discusses the interfaces for communication between planner and connector factory. As Dawid said earlier, a DynamicTableSource can be more seen as a factory that calls pluggable interfaces of a native connextor in the end:
KafkaConnector.builder() .watermarkAssigner(...) .keyDeser(...) .valueDeser(...) .... .build() Regards, Timo On 25.03.20 09:05, Becket Qin wrote:
Hi Kurt, I do not object to promote the concepts of SQL, but I don't think we should do that by introducing a new dedicate set of connector public interfaces that is only for SQL. The same argument can be applied to Gelly, CEP, and Machine Learning, claiming that they need to introduce a dedicated public set of interfaces that fits their own concept and ask the the connector developers to learn and follow their design. As an analogy, if we want to promote Chinese, we don't want to force people to learn ancient Chinese poem while they only need to know a few words like "hello" and "goodbye". As some design principles, here are what I think what Flink connectors should look like: 1. The native connector interface is a generic abstraction of doing IO and Serde, without semantic for high level use cases such as SQL, Gelly, CEP, etc. 2. Some advanced features that may help accelerate the IO and Serde could be provided in the native connector interfaces in a semantic free way so all the high level use cases can leverage. 3. Additional semantics can be built on top of the native source interface through providing different plugins. These plugins could be high level use case aware. For example, to provide a filter to the source, we can do the following // An interface for all the filters that take an expression. interface ExpressionFilter { FilterResult applyFilterExpression(); } // An filter plugin implementation that translate the SQL Expression to a ParquetFilterPredicate. Class ParquetExpressionFilter implements Supplier<ParquetFilterPredicate>, ExpressionFilter { // Called by the high level use case, FilterResult applyFilterExpression() { ... } // Used by the native Source interface. ParquetFilterPredicate get() { ... } } In this case, the connector developer just need to write the logic of translating an Expression to Parquet FilterPredicate. They don't have to understand the entire set of interfaces that we want to promote. Just like they only need to know how to say "Hello" without learning ancient Chinese poem. Again, I am not saying this is necessarily the best approach. But so far it seems a reasonable design principle to tell the developers. Thanks, Jiangjie (becket) Qin On Wed, Mar 25, 2020 at 11:53 AM Kurt Young <ykt...@gmail.com> wrote:Hi Becket, I don't think we should discuss this in pure engineering aspects. Your proposal is trying to let SQL connector developers understand as less SQL concepts as possible. But quite the opposite, we are designing those interfaces to emphasize the SQL concept, to bridge high level concepts into real interfaces and classes. We keep talking about time-varying relations and dynamic table when introduce SQL concepts, sources and sinks are most critical part playing with those concepts. It's essential to let Flink SQL developers to learn these concepts and connect them with real codes by introducing these connector interfaces and can further write *correct* connectors based on such domain knowledge. So this FLIP is a very important chance to express these concepts and make most SQL developers be align with concepts and on same page. It's mostly for different level of abstractions and for domains like SQL, it's becoming more important. It helps Flink SQL go smoothly in the future, and also make it easier for new contributors. But I would admit this is not that obvious for others who don't work with SQL frequently. Best, Kurt On Wed, Mar 25, 2020 at 11:07 AM Becket Qin <becket....@gmail.com> wrote:Hi Jark, It is good to know that we do not expect the end users to touch those interfaces. Then the question boils down to whether the connector developers shouldbeaware of the interfaces that are only used by the SQL optimizer. Itseems awin if we can avoid that. Two potential solutions off the top of my head are: 1. An internal helper class doing the instanceOf based on DataStreamsourceinterface and create pluggables for that DataStream source. 2. codegen the set of TableSource interfaces given a DataStream Sourceandits corresponding TablePluggablesFactory. Thanks, Jiangjie (Becket) Qin On Wed, Mar 25, 2020 at 10:07 AM Jark Wu <imj...@gmail.com> wrote:Hi Becket, Regarding to Flavor1 and Flavor2, I want to clarify that user willneveruse table source like this: { MyTableSource myTableSource = MyTableSourceFactory.create(); myTableSource.setSchema(mySchema); myTableSource.applyFilterPredicate(expression); ... } TableFactory and TableSource are not directly exposed to end users, allthemethods are called by planner, not users. Users always use DDL or descriptor to register a table, and plannerwillfind the factory and create sources according to the properties. All the optimization are applied automatically, e.g. filter/projection pushdown, users don't need to call `applyFilterPredicate` explicitly. On Wed, 25 Mar 2020 at 09:25, Becket Qin <becket....@gmail.com> wrote:Hi Timo and Dawid, Thanks for the clarification. They really help. You are right that weareon the same page regarding the hierarchy. I think the only difference between our view is the flavor of the interfaces. There are twoflavorsofthe source interface for DataStream and Table source. *Flavor 1. Table Sources are some wrapper interfaces aroundDataStreamsource.* Following this way, we will reach the design of the current proposal,i.e.each pluggable exposed in the DataStream source will have acorrespondingTableSource interface counterpart, which are at the Factory level.Userswill write code like this: { MyTableSource myTableSource = MyTableSourceFactory.create(); myTableSource.setSchema(mySchema); myTableSource.applyFilterPredicate(expression); ... } The good thing for this flavor is that from the SQL / Table'sperspective,there is a dedicated set of Table oriented interface. The downsides are: A. From the user's perspective, DataStream Source and Table Sourcearejusttwo different sets of interfaces, regardless of how they are the same internally. B. The source developers have to develop for those two sets ofinterfacesin order to support both DataStream and Table. C. It is not explicit that DataStream can actually share thepluggableinTable / SQL. For example, in order to provide a filter pluggable withSQLexpression, users will have to know the actual converter class that converts the expression to the filter predicate and construct that converter by themselves. --------------- *Flavor 2. A TableSource is a DataStream source with a bunch ofpluggables.No Table specific interfaces at all.* Following this way, we will reach another design where you have a SourceFactory and a single Pluggable factory for all the tablepluggables.And users will write something like: { Deserializer<Row> myTableDeserializer = MyTablePluggableFactory.createDeserializer(schema) MySource<Row> mySource = MySourceFactory.create(properties, myTableDeserializer);mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression));} The good thing for this flavor is that there is just one set ofinterfacethat works for both Table and DataStream. There is no differencebetweencreating a DataStream source and creating a Table source. DataStreamcaneasily reuse the pluggables from the Table sources. The downside is that Table / SQL won't have a dedicated API for optimization. Instead of writing: if (MyTableSource instanceOf FilterableTableSource) { // Some filter push down logic. MyTableSource.applyPredicate(expression) } One have to write: if (MySource instanceOf FilterableSource) { // Some filter push down logic.mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression));} ------------------------- Just to be clear, I am not saying flavor 2 is necessarily better than flavor 1, but I want to make sure flavor 2 is also considered and discussed. Thanks, Jiangjie (Becket) Qin. On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz <dwysakow...@apache.org>wrote:Hi Becket, I really think we don't have a differing opinions. We might not seethechanges in the same way yet. Personally I think of theDynamicTableSourceas of a factory for a Source implemented for the DataStream API.Theimportant fact about the DynamicTableSource and all feature traits (SupportsFilterablePushDown, SupportsProjectPushDown etc.) workwithTableAPI concepts such as e.g. Expressions, SQL specific types etc. Intheendwhat the implementation would resemble is (bear in mind Itremendouslysimplified the example, just to show the relation between the twoAPIs):SupportsFilterablePushDown { applyFilters(List<ResolvedExpression> filters) { this.filters = convertToDataStreamFilters(filters); } Source createSource() { return Source.create() .applyFilters(this.filters); } } or exactly as you said for the computed columns: SupportsComputedColumnsPushDown { applyComputedColumn(ComputedColumnConverter converter) { this.deserializationSchema = new DeserializationSchema<Row> { Row deserialize(...) { RowData row = format.deserialize(bytes); // originalformat,e.gjson, avro, etc. RowData enriched = converter(row) } } } Source createSource() { return Source.create() .withDeserialization(deserializationSchema); } } So to sum it up again, all those interfaces are factories thatconfigureappropriate parts of the DataStream API using Table API concepts.Finallyto answer you question for particular comparisons: DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> SupportsFilterablePushDown v.s. FilterableSource SupportsProjectablePushDown v.s. ProjectableSource SupportsWatermarkPushDown v.s. WithWatermarkAssigner SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer ScanTableSource v.s. ChangeLogDeserializer. pretty much you can think of all on the left as factories for therightside, left side works with Table API classes (Expressions,DataTypes).Ihope this clarifies it a bit. Best, Dawid On 24/03/2020 15:03, Becket Qin wrote: Hey Kurt, I don't think DataStream should see some SQL specific concepts suchasFiltering or ComputedColumn. Projectable and Filterable seems not necessarily SQL concepts, butcouldbeapplicable to DataStream source as well to reduce the network load.Forexample ORC and Parquet should probably also be readable fromDataStream,right? ComputedColumn is not part of the Source, it is an interfaceextendstheDeserializer, which is a pluggable for the Source. From the SQL's perspective it has the concept of computed column, but from theSourceperspective, It is essentially a Deserializer which also convertstherecords internally, assuming we allow some conversion to beembeddedtothe source in addition to just deserialization. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <imj...@gmail.com> <imj...@gmail.com> wrote:Thanks Timo for updating the formats section. That would be veryhelpfulfor changelog supporting (FLIP-105). I just left 2 minor comment about some method names. In general,I'm+1tostart a voting.--------------------------------------------------------------------------------------------------Hi Becket, I agree we shouldn't duplicate codes, especiall the runtime implementations. However, the interfaces proposed by FLIP-95 are mainly used during optimization (compiling), not runtime. I don't think there is much to share for this. Because table/sql is declarative, but DataStream is imperative. For example, filter push down, DataStream FilterableSource mayallowtoaccept a FilterFunction (which is a black box for the source). However, table sources should pick the pushed filter expressions,somesources may only support "=", "<", ">" conditions. Pushing a FilterFunction doesn't work in table ecosystem. Thatmeans,theconnectors have to have some table-specific implementations. Best, Jark On Tue, 24 Mar 2020 at 20:41, Kurt Young <ykt...@gmail.com> <ykt...@gmail.com> wrote:Hi Becket, I don't think DataStream should see some SQL specific concepts suchasFiltering or ComputedColumn. It's better to stay within SQL area and translate to more genericconceptwhentranslating to DataStream/Runtime layer, such as use MapFunction to represent computed column logic. Best, Kurt On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <becket....@gmail.com><becket....@gmail.com> wrote:Hi Timo and Dawid, It's really great that we have the same goal. I am actuallywonderingif we can go one step further to avoid some of the interfaces in Table as well. For example, if we have the FilterableSource, do we still need the FilterableTableSource? Should DynamicTableSource just become a Source<*Row*, SourceSplitT, EnumChkT>? Can you help me understand a bit more about the reason we need the following relational representation / wrapper interfaces v.s. the interfaces that we could put to the Source in FLIP-27? DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> SupportsFilterablePushDown v.s. FilterableSource SupportsProjectablePushDown v.s. ProjectableSource SupportsWatermarkPushDown v.s. WithWatermarkAssigner SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer ScanTableSource v.s. ChangeLogDeserializer. LookUpTableSource v.s. LookUpSource Assuming we have all the interfaces on the right side, do we stillneedthe interfaces on the left side? Note that the interfaces on the rightcanbe used by both DataStream and Table. If we do this, there will onlybeone set of Source interfaces Table and DataStream, the only differenceisthat the Source for table will have some specific plugins and configurations. An omnipotent Source can implement all the the above interfaces andtake aDeserializer that implements both ComputedColumnDeserializer and ChangeLogDeserializer. Would the SQL planner work with that? Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <jingsongl...@gmail.com><jingsongl...@gmail.com>wrote: +1. Thanks Timo for the design doc. We can also consider @Experimental too. But I am +1 to @PublicEvolving, we should be confident in the current change. Best, Jingsong Lee On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <twal...@apache.org><twal...@apache.org>wrote: @Becket: We totally agree that we don't need table specific connectors during runtime. As Dawid said, the interfaces proposed here are just for communication with the planner. Once the properties (watermarks, computed column, filters, projecttion etc.) are negotiated, we can configure a regular Flink connector. E.g. setting the watermark assigner and deserialization schema of a Kafka connector. For better separation of concerns, Flink connectors should not include relational interfaces and depend on flink-table. This is the responsibility of table source/sink. @Kurt: I would like to mark them @PublicEvolving already because we need to deprecate the old interfaces as early as possible. We cannot redirect to @Internal interfaces. They are not marked @Public, so we can still evolve them. But a core design shift should not happen again, it would leave a bad impression if we are redesign over and over again. Instead we should be confident in the current change. Regards, Timo On 24.03.20 09:20, Dawid Wysakowicz wrote: Hi Becket, Answering your question, we have the same intention not to duplicate connectors between datastream and table apis. The interfaces proposed in the FLIP are a way to describe relational properties of a source. The intention is as you described to translate all of those expressed as expressions or other Table specific structures into a DataStream source. In other words I think what we are doing here is in line with what you described. Best, Dawid On 24/03/2020 02:23, Becket Qin wrote: Hi Timo, Thanks for the proposal. I completely agree that the current Table connectors could be simplified quite a bit. I haven't finished reading everything, but here are some quick thoughts. Actually to me the biggest question is why should there be two different connector systems for DataStream and Table? What is the fundamental reason that is preventing us from merging them to one? The basic functionality of a connector is to provide capabilities to do IO and Serde. Conceptually, Table connectors should just be DataStream connectors that are dealing with Rows. It seems that quite a few of the special connector requirements are just a specific way to do IO / Serde. Taking SupportsFilterPushDown as an example, imagine we have the following interface: interface FilterableSource<PREDICATE> { void applyFilterable(Supplier<PREDICATE> predicate); } And if a ParquetSource would like to support filterable, it will become: class ParquetSource implements Source, FilterableSource(FilterPredicate> { ... } For Table, one just need to provide an predicate supplier that converts an Expression to the specified predicate type. This has a few benefit: 1. Same unified API for filterable for sources, regardless of DataStream or Table. 2. The DataStream users now can also use the ExpressionToPredicate supplier if they want to. To summarize, my main point is that I am wondering if it is possible to have a single set of connector interface for both Table and DataStream, rather than having two hierarchies. I am not 100% sure if this would work, but if it works, this would be a huge win from both code maintenance and user experience perspective. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < dwysakow...@apache.org> wrote: Hi Timo, Thank you for the proposal. I think it is an important improvement that will benefit many parts of the Table API. The proposal looks really good to me and personally I would be comfortable with voting on the current state. Best, Dawid On 23/03/2020 18:53, Timo Walther wrote: Hi everyone, I received some questions around how the new interfaces play together with formats and their factories. Furthermore, for MySQL or Postgres CDC logs, the format should be able to return a `ChangelogMode`. Also, I incorporated the feedback around the factory design in general. I added a new section `Factory Interfaces` to the design document. This should be helpful to understand the big picture and connecting the concepts. Please let me know what you think? Thanks, Timo On 18.03.20 13:43, Timo Walther wrote: Hi Benchao, this is a very good question. I will update the FLIP about this. The legacy planner will not support the new interfaces. It will only support the old interfaces. With the next release, I think the Blink planner is stable enough to be the default one as well. Regards, Timo On 18.03.20 08:45, Benchao Li wrote: Hi Timo, Thank you and others for the efforts to prepare this FLIP. The FLIP LGTM generally. +1 for moving blink data structures to table-common, it's useful to udf too in the future. A little question is, do we plan to support the new interfaces and data types in legacy planner? Or we only plan to support these new interfaces in blink planner. And using primary keys from DDL instead of derived key information from each query is also a good idea, we met some use cases where this does not works very well before. This FLIP also makes the dependencies of table modules more clear, I like it very much. Timo Walther <twal...@apache.org> <twal...@apache.org>于2020年3月17日周二上午1:36写道:Hi everyone, I'm happy to present the results of long discussions that we had internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more have contributed to this design document. We would like to propose new long-term table source and table sink interfaces:https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfacesThis is a requirement for FLIP-105 and finalizing FLIP-32. The goals of this FLIP are: - Simplify the current interface architecture: - Merge upsert, retract, and append sinks. - Unify batch and streaming sources. - Unify batch and streaming sinks. - Allow sources to produce a changelog: - UpsertTableSources have been requested a lot by users. Now is the time to open the internal planner capabilities via the new interfaces. - According to FLIP-105, we would like to support changelogs for processing formats such as Debezium. - Don't rely on DataStream API for source and sinks: - According to FLIP-32, the Table API and SQL should be independent of the DataStream API which is why the `table-common` module has no dependencies on `flink-streaming-java`. - Source and sink implementations should only depend on the `table-common` module after FLIP-27. - Until FLIP-27 is ready, we still put most of the interfaces in `table-common` and strictly separate interfaces that communicate with a planner and actual runtime reader/writers. - Implement efficient sources and sinks without planner dependencies: - Make Blink's internal data structures available to connectors. - Introduce stable interfaces for data structures that can be marked as `@PublicEvolving`. - Only require dependencies on `flink-table-common` in the future It finalizes the concept of dynamic tables and consideres how all source/sink related classes play together. We look forward to your feedback. Regards, Timo -- Best, Jingsong Lee