Hi Aljoscha,

Thank you for your feedback,

## Connector parallelism

Requirements:
Set parallelism by user specified or inferred by connector.

How to configure parallelism in DataStream:
In the DataStream world, the only way to configure parallelism is
`SingleOutputStreamOperator.setParallelism`. Actually, users need to have
access to DataStream when using a connector, not just the `SourceFunction`
/ `Source` interface.
Is parallelism related to connectors? I think yes, there are many
connectors that can support obtaining parallelism related information from
them, and users do exactly that. This means parallelism inference (From
connectors).
The key is that `DataStream` is an open programming API, and users can
freely program to set parallelism.

How to configure parallelism in Table/SQL:
But Table/SQL is not an open programming API, every feature needs a
corresponding mechanism, because the user is no longer able to program. Our
current connector interface: SourceFunctionProvider, SinkFunctionProvider,
through these interfaces, there is no ability to generate connector related
parallelism.
Back to our original intention: to avoid users directly manipulating
`DataStream`. Since we want to avoid it, we need to provide corresponding
features.

And parallelism is the runtime information of connectors, It fits the name
of `ScanRuntimeProvider`.

> If we wanted to add a "get parallelism" it would be in those underlying
connectors but I'm also skeptical about adding such a method there because
it is a static assignment and would preclude clever optimizations about the
parallelism of a connector at runtime.

I think that when a job is submitted, it is in compile time. It should only
provide static parallelism.

## DataStream in table connector

As I said before, if we want to completely cancel DataStream in the table
connector, we need to provide corresponding functions in
`xxRuntimeProvider`.
Otherwise, we and users may not be able to migrate the old connectors.
Including Hive/FileSystem connectors and the user cases I mentioned above.
CC: @liu shouwei

We really need to consider these cases.
If there is no alternative in a short period of time, for a long
time, users need to continue to use the old table connector API, which has
been deprecated.

Why not use StreamTableEnvironment fromDataStream/toDataStream?
- These tables are just temporary tables. Can not be integrated/stored into
Catalog.
- Creating table DDL can not work...
- We need to lose the kinds of useful features of Table/SQL on the
connector. For example, projection pushdown, filter pushdown, partitions
and etc...

But I believe you are right in the long run. The source and sink APIs
should be powerful enough to cover all reasonable cases.
Maybe we can just introduce them in a minimal way. For example, we only
introduce `DataStreamSinkProvider` in planner as an internal API.

Your points are very meaningful, hope to get your reply.

Best,
Jingsong

On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <wenlong88....@gmail.com>
wrote:

> Hiļ¼ŒAljoscha, I would like to share a use case to second setting parallelism
> of table sink(or limiting parallelism range of table sink): When writing
> data to databases, there is limitation for number of jdbc connections and
> query TPS. we would get errors of too many connections or high load for
> db and poor performance because of too many small requests if the optimizer
> didn't know such information, and set a large parallelism for sink when
> matching the parallelism of its input.
>
> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Thanks for the proposal! I think the use cases that we are trying to
> > solve are indeed valid. However, I think we might have to take a step
> > back to look at what we're trying to solve and how we can solve it.
> >
> > The FLIP seems to have two broader topics: 1) add "get parallelism" to
> > sinks/sources 2) let users write DataStream topologies for
> > sinks/sources. I'll treat them separately below.
> >
> > I think we should not add "get parallelism" to the Table Sink API
> > because I think it's the wrong level of abstraction. The Table API
> > connectors are (or should be) more or less thin wrappers around
> > "physical" connectors. By "physical" I mean the underlying (mostly
> > DataStream API) connectors. For example, with the Kafka Connector the
> > Table API connector just does the configuration parsing and determines a
> > good (de)serialization format and then creates the underlying
> > FlinkKafkaConsumer/FlinkKafkaProducer.
> >
> > If we wanted to add a "get parallelism" it would be in those underlying
> > connectors but I'm also skeptical about adding such a method there
> > because it is a static assignment and would preclude clever
> > optimizations about the parallelism of a connector at runtime. But maybe
> > that's thinking too much about future work so I'm open to discussion
> there.
> >
> > Regarding the second point of letting Table connector developers use
> > DataStream: I think we should not do it. One of the purposes of FLIP-95
> > [1] was to decouple the Table API from the DataStream API for the basic
> > interfaces. Coupling the two too closely at that basic level will make
> > our live harder in the future when we want to evolve those APIs or when
> > we want the system to be better at choosing how to execute sources and
> > sinks. An example of this is actually the past of the Table API. Before
> > FLIP-95 we had connectors that dealt directly with DataSet and
> > DataStream, meaning that if users wanted their Table Sink to work in
> > both BATCH and STREAMING mode they had to provide two implementations.
> > The trend is towards unifying the sources/sinks to common interfaces
> > that can be used for both BATCH and STREAMING execution but, again, I
> > think exposing DataStream here would be a step back in the wrong
> direction.
> >
> > I think the solution to the existing user requirement of using
> > DataStream sources and sinks with the Table API should be better
> > interoperability between the two APIs, which is being tackled right now
> > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> > we're trying to solve here, maybe we should think about FLIP-136 some
> more.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >
> >
>


-- 
Best, Jingsong Lee

Reply via email to