Hi Aljoscha, Thank you for your feedback,
## Connector parallelism Requirements: Set parallelism by user specified or inferred by connector. How to configure parallelism in DataStream: In the DataStream world, the only way to configure parallelism is `SingleOutputStreamOperator.setParallelism`. Actually, users need to have access to DataStream when using a connector, not just the `SourceFunction` / `Source` interface. Is parallelism related to connectors? I think yes, there are many connectors that can support obtaining parallelism related information from them, and users do exactly that. This means parallelism inference (From connectors). The key is that `DataStream` is an open programming API, and users can freely program to set parallelism. How to configure parallelism in Table/SQL: But Table/SQL is not an open programming API, every feature needs a corresponding mechanism, because the user is no longer able to program. Our current connector interface: SourceFunctionProvider, SinkFunctionProvider, through these interfaces, there is no ability to generate connector related parallelism. Back to our original intention: to avoid users directly manipulating `DataStream`. Since we want to avoid it, we need to provide corresponding features. And parallelism is the runtime information of connectors, It fits the name of `ScanRuntimeProvider`. > If we wanted to add a "get parallelism" it would be in those underlying connectors but I'm also skeptical about adding such a method there because it is a static assignment and would preclude clever optimizations about the parallelism of a connector at runtime. I think that when a job is submitted, it is in compile time. It should only provide static parallelism. ## DataStream in table connector As I said before, if we want to completely cancel DataStream in the table connector, we need to provide corresponding functions in `xxRuntimeProvider`. Otherwise, we and users may not be able to migrate the old connectors. Including Hive/FileSystem connectors and the user cases I mentioned above. CC: @liu shouwei We really need to consider these cases. If there is no alternative in a short period of time, for a long time, users need to continue to use the old table connector API, which has been deprecated. Why not use StreamTableEnvironment fromDataStream/toDataStream? - These tables are just temporary tables. Can not be integrated/stored into Catalog. - Creating table DDL can not work... - We need to lose the kinds of useful features of Table/SQL on the connector. For example, projection pushdown, filter pushdown, partitions and etc... But I believe you are right in the long run. The source and sink APIs should be powerful enough to cover all reasonable cases. Maybe we can just introduce them in a minimal way. For example, we only introduce `DataStreamSinkProvider` in planner as an internal API. Your points are very meaningful, hope to get your reply. Best, Jingsong On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com> wrote: > Hiļ¼Aljoscha, I would like to share a use case to second setting parallelism > of table sink(or limiting parallelism range of table sink): When writing > data to databases, there is limitation for number of jdbc connections and > query TPS. we would get errors of too many connections or high load for > db and poor performance because of too many small requests if the optimizer > didn't know such information, and set a large parallelism for sink when > matching the parallelism of its input. > > On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Thanks for the proposal! I think the use cases that we are trying to > > solve are indeed valid. However, I think we might have to take a step > > back to look at what we're trying to solve and how we can solve it. > > > > The FLIP seems to have two broader topics: 1) add "get parallelism" to > > sinks/sources 2) let users write DataStream topologies for > > sinks/sources. I'll treat them separately below. > > > > I think we should not add "get parallelism" to the Table Sink API > > because I think it's the wrong level of abstraction. The Table API > > connectors are (or should be) more or less thin wrappers around > > "physical" connectors. By "physical" I mean the underlying (mostly > > DataStream API) connectors. For example, with the Kafka Connector the > > Table API connector just does the configuration parsing and determines a > > good (de)serialization format and then creates the underlying > > FlinkKafkaConsumer/FlinkKafkaProducer. > > > > If we wanted to add a "get parallelism" it would be in those underlying > > connectors but I'm also skeptical about adding such a method there > > because it is a static assignment and would preclude clever > > optimizations about the parallelism of a connector at runtime. But maybe > > that's thinking too much about future work so I'm open to discussion > there. > > > > Regarding the second point of letting Table connector developers use > > DataStream: I think we should not do it. One of the purposes of FLIP-95 > > [1] was to decouple the Table API from the DataStream API for the basic > > interfaces. Coupling the two too closely at that basic level will make > > our live harder in the future when we want to evolve those APIs or when > > we want the system to be better at choosing how to execute sources and > > sinks. An example of this is actually the past of the Table API. Before > > FLIP-95 we had connectors that dealt directly with DataSet and > > DataStream, meaning that if users wanted their Table Sink to work in > > both BATCH and STREAMING mode they had to provide two implementations. > > The trend is towards unifying the sources/sinks to common interfaces > > that can be used for both BATCH and STREAMING execution but, again, I > > think exposing DataStream here would be a step back in the wrong > direction. > > > > I think the solution to the existing user requirement of using > > DataStream sources and sinks with the Table API should be better > > interoperability between the two APIs, which is being tackled right now > > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > > we're trying to solve here, maybe we should think about FLIP-136 some > more. > > > > What do you think? > > > > Best, > > Aljoscha > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > > > > -- Best, Jingsong Lee