Hi Zhanghao,

Thanks for your explanation!

- For question 2, I can accept that the user must provide a pk when
modifying the parallelism of the cdc source.

- For question 1&3 and additional concerns, if there is no more general or
discussable solution at the time, I agree to address part of the tuning
requirements in current manner.

Thanks again for moving this FLIP forward!

Best,
Lincoln Lee


Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月25日周一 12:04写道:

> Hi Lincoln,
>
> Thanks for the comments.
>
> - For concerns #1, I agree that we do not always produce optimal plan for
> both cases. However, it is difficult to do so and non-trivial complexity is
> expected. On the other hand, although our proposal generates a sub-optimal
> plan when the bottleneck is on the aggregate operator, it still provides
> more flexibility for performance tuning. Therefore, I think we can
> implement it in the straightforward way first, WDYT?
>
> - For concerns #2, I'd like to clarify a bit: exception will only be
> thrown i.f.f. the source may produce delete/update messages AND no primary
> key specified AND the source parallelism is different from the default
> parallelism. So for CDC without pk, we're still good if source parallelism
> is not specified.
>
> - For concerns #3, at the current point, I think making the name more
> specific is better as no other future use cases can be previsioned now.
> Since this is an internal API, we are free to refactor it later if needed.
>
> - For concerns about adaptive scheduler, the problems you mentioned do
> exist, but I don't think it relevant here. The planner may encode some
> hints to help the scheduler, but afterall, those efforts should be
> initiated on the scheduler side.
>
> Best,
> Zhanghao Chen
> ________________________________
> 发件人: Lincoln Lee <lincoln.8...@gmail.com>
> 发送时间: 2023年9月22日 23:19
> 收件人: dev@flink.apache.org <dev@flink.apache.org>
> 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL
> Sources
>
> Hi Zhanghao,
>
> Thanks for the FLIP and discussion!  Hope this reply isn't too late : )
> Firstly I'm fully agreed with the motivation of this FLIP and the value for
> the users, but there are a few things we should consider(please correct me
> if I'm misunderstanding):
>
> *1.  *It seems that the current solution only takes care of part of the
> requirement, the need to set source's parallelism may be different in
> different jobs,  for example, consider the following two job topologies(one
> {} simply represents a vertex):
> a. {source -> calc -> sink}
>
> b. {source -> calc} -> {aggregate} -> {sink}
>
> For job a, if there is a bottleneck in calc operator, but source
> parallelism cannot be scaled up (e.g., limited by kafka's partition
> number), so the calc operator cannot be scaled up to achieve higher
> throughput because the operators in source vertex are chained together,
> then current solution is reasonable (break the chain, add a shuffle).
>
> But for job b, if the bottleneck is the aggregate operator (not calc), it's
> more likely be better to scale up the aggregate operator/vertex and without
> breaking the {source -> calc} chain, as this will incur additional shuffle
> cost.
> So if we decide to add this new feature, I would recommend that both cases
> be taken care of.
>
>
> 2. the assumption that a cdc source must have pk(primary key) may not be
> reasonable, for example, mysql cdc supports the case without pk(
>
> https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#tables-without-primary-keys
> ),
> so we can not just raise an error here.
>
>
> 3. for the new SourceTransformationWrapper I have some concerns about the
> future evolution, if we need to add support for other operators, do we
> continue to add new xxWrappers?
>
> I've also revisited the previous discussion on FLIP-146[1], there were no
> clear conclusions or good ideas about similar support issues for the source
> before, and I also noticed that the new capability to change per-vertex
> parallelism via rest api in 1.18 (part of FLIP-291[2][3], but there is
> actually an issue about sql job's parallelism change which may require a
> hash shuffle to ensure the order of update stream, this needs to be
> followed up in FLIP-291, a jira will be created later).  So perhaps, we
> need to think about it more (the next version is not yet launched, so we
> still have time)
>
> [1] https://lists.apache.org/thread/gtpswl42jzv0c9o3clwqskpllnw8rh87
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-291%3A+Externalized+Declarative+Resource+Management
> [3] https://issues.apache.org/jira/browse/FLINK-31471
>
>
> Best,
> Lincoln Lee
>
>
> Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月22日周五 16:00写道:
>
> > Thanks to everyone who participated in the discussion here. If no further
> > questions/concerns are raised, we'll start voting next Monday afternoon
> > (GMT+8).
> >
> > Best,
> > Zhanghao Chen
> > ________________________________
> > 发件人: Jane Chan <qingyue....@gmail.com>
> > 发送时间: 2023年9月22日 15:35
> > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL
> > Sources
> >
> > Hi Zhanghao,
> >
> > Thanks for the update; +1 for the proposal!
> >
> > Best,
> > Jane
> >
> > On Fri, Sep 22, 2023 at 2:13 PM Chen Zhanghao <zhanghao.c...@outlook.com
> >
> > wrote:
> >
> > > Hi Jane,
> > >
> > > Thanks for the suggestions and totally agree with them. I've updated
> the
> > > FLIP with the following two changes:
> > >
> > > 1. Rename WrapperTransformation to SourceTransformationWrapper that
> wraps
> > > a SourceTransformation only. Note that we do not plan to support the
> > legacy
> > > LegacySourceTransformation.
> > > 2. Choosing the partitioner after the source will be based on the
> > > changelog mode of the source + the existence of the primary key in
> source
> > > schema. If the source will produce update/delete message but a primary
> > key
> > > does not exist, an exception will be thrown.
> > >
> > > Best,
> > > Zhanghao Chen
> > > ________________________________
> > > 发件人: Jane Chan <qingyue....@gmail.com>
> > > 发送时间: 2023年9月20日 15:13
> > > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for Table/SQL
> > > Sources
> > >
> > > Hi Zhanghao,
> > >
> > > Thanks for the update. The FLIP now looks good to me in general, and I
> > have
> > > two minor comments.
> > >
> > > 1. Compared with other subclasses like `CacheTransformation` or
> > > `PartitionTransformation`, the name  `WrapperTransformation` seems too
> > > general. What about `SourceTransformationWrapper`, which is more
> specific
> > > and descriptive, WDYT?
> > >
> > > 2.
> > >
> > > > When the source generates update and delete data (determined by
> > checking
> > > > the existence of a primary key in the source schema), the source will
> > use
> > > > hash partitioner to send data.
> > >
> > >
> > > It might not be sufficient to determine whether the source is a CDC
> > source
> > > solely based on checking the existence of the primary key. It's better
> to
> > > check the changelog mode of the source. On the other hand, adding the
> > hash
> > > partitioner requires the CDC source table to declare the primary key in
> > the
> > > DDL. Therefore, it is preferable to explain this restriction in the
> FLIP
> > > and doc and throw a meaningful exception when users want to configure a
> > > different parallelism for a CDC source but forget to declare the
> primary
> > > key constraint.
> > >
> > > Best,
> > > Jane
> > >
> > > On Wed, Sep 20, 2023 at 9:20 AM Benchao Li <libenc...@apache.org>
> wrote:
> > >
> > > > Thank you for the update, the FLIP now looks good to me.
> > > >
> > > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月19日周二 22:50写道:
> > > > >
> > > > > Thanks to everyone for the valuable inputs, we learnt a lot during
> > the
> > > > discussion. We've updated the FLIP in three main aspects based on the
> > > > discussion here:
> > > > >
> > > > > - Add a new subsection on keeping downstream operators' parallelism
> > > > unchanged by wrapping the source transformation in a phantom
> > > transformation.
> > > > > - Add a new subsection on how to deal with changelog messages,
> simply
> > > > put, build a hash partitioner based on the primary key when a source
> > > > generates update/delete data.
> > > > > - Update the non-goals section to remove the possibly misleading
> > > > statement that setting parallelism for individual operators lacks
> > public
> > > > interest and state that we leave it for future work due to its extra
> > > > complexity.
> > > > >
> > > > > Looking forward to your suggestions.
> > > > >
> > > > > Best,
> > > > > Zhanghao Chen
> > > > > ________________________________
> > > > > 发件人: Feng Jin <jinfeng1...@gmail.com>
> > > > > 发送时间: 2023年9月17日 0:56
> > > > > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for
> Table/SQL
> > > > Sources
> > > > >
> > > > > Hi, Zhanghao
> > > > >
> > > > > Thank you for proposing this FLIP, it is a very meaningful feature.
> > > > >
> > > > > I agree that currently we may only consider the parallelism setting
> > of
> > > > the
> > > > > source itself. If we consider the parallelism setting of other
> > > operators,
> > > > > it may make the entire design more complex.
> > > > >
> > > > > Regarding the situation where the parallelism of the source is
> > > different
> > > > > from that of downstream tasks, I did not find a more detailed
> > > description
> > > > > in FLIP.
> > > > >
> > > > > By default, if the parallelism between two operators is different,
> > the
> > > > > rebalance partitioner will be used.
> > > > > But in the SQL scenario, I believe that we should keep the behavior
> > of
> > > > > parallelism setting consistent with that of the sink.
> > > > >
> > > > > 1. When the source only generates insert-only data, if there is a
> > > > mismatch
> > > > > in parallelism between the source and downstream operators,
> rebalance
> > > is
> > > > > used by default.
> > > > >
> > > > > 2. When the source generates update and delete data, we should
> > require
> > > > the
> > > > > source to configure a primary key and then build a hash partitioner
> > > based
> > > > > on that primary key.
> > > > >
> > > > > WDYT ?
> > > > >
> > > > >
> > > > > Best,
> > > > > Feng
> > > > >
> > > > >
> > > > > On Sat, Sep 16, 2023 at 5:58 PM Jane Chan <qingyue....@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Zhanghao,
> > > > > >
> > > > > > Thanks for the explanation.
> > > > > >
> > > > > > For Q1, I think the key lies in determining the boundary where
> the
> > > > chain
> > > > > > should be broken. However, this boundary is ultimately determined
> > by
> > > > the
> > > > > > specific requirements of each user query.
> > > > > >
> > > > > > The most straightforward approach is breaking the chain after the
> > > > source
> > > > > > operator, even though it involves a tradeoff. This is because
> there
> > > > may be
> > > > > > instances of `StreamExecWatermarkAssigner`,
> > > > `StreamExecMiniBatchAssigner`,
> > > > > > or `StreamExecChangelogNormalize` occurring before the
> > > `StreamExecCalc`
> > > > > > node, and it would be complex and challenging to enumerate all
> > > possible
> > > > > > match patterns.
> > > > > >
> > > > > > A more complex workaround would be to provide an entry point for
> > > users
> > > > to
> > > > > > configure the specific operator that should serve as the
> > breakpoint.
> > > > > > Meanwhile, this would further increase the complexity of this
> FLIP.
> > > > > >
> > > > > > However, if the parallelism of each operator can be configured
> (in
> > > the
> > > > > > future), then this problem would not exist (it might be beyond
> the
> > > > scope of
> > > > > > discussion for this FLIP).
> > > > > >
> > > > > > I personally lean towards keeping the FLIP concise and focused by
> > > > choosing
> > > > > > the most straightforward approach. I would also like to hear
> > other's
> > > > > > opinions.
> > > > > >
> > > > > > Best,
> > > > > > Jane
> > > > > >
> > > > > > On Sat, Sep 16, 2023 at 10:21 AM Yun Tang <myas...@live.com>
> > wrote:
> > > > > >
> > > > > > > Hi Zhanghao,
> > > > > > >
> > > > > > > Certainly, I think we shall leave this FLIP focus on setting
> the
> > > > source
> > > > > > > parallelism via DDL's properties. I just want to clarify that
> > > setting
> > > > > > > parallelism for individual operators is also profitable from my
> > > > > > experience,
> > > > > > > which is slighted in your FLIP.
> > > > > > >
> > > > > > > @ConradJam BTW, compared with SQL hint, I think using
> > > > `scan.parallelism`
> > > > > > > is better to align with current `sink.parallelism`. And once we
> > > > introduce
> > > > > > > such option, we can also use SQL hint of dynamic table
> options[1]
> > > to
> > > > > > > configure the source parallelism.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options
> > > > > > >
> > > > > > >
> > > > > > > Best
> > > > > > > Yun Tang
> > > > > > > ________________________________
> > > > > > > From: ConradJam <jam.gz...@gmail.com>
> > > > > > > Sent: Friday, September 15, 2023 22:52
> > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting Parallelism
> for
> > > > > > Table/SQL
> > > > > > > Sources
> > > > > > >
> > > > > > > + 1 Thanks for the FLIP and the discussion. I would like to ask
> > > > whether
> > > > > > to
> > > > > > > use SQL Hint syntax to set this parallelism?
> > > > > > >
> > > > > > > Martijn Visser <martijnvis...@apache.org> 于2023年9月15日周五
> 20:52写道:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Thanks for the FLIP and the discussion. I find it exciting.
> > > Thanks
> > > > for
> > > > > > > > pushing for this.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Martijn
> > > > > > > >
> > > > > > > > On Fri, Sep 15, 2023 at 2:25 PM Chen Zhanghao <
> > > > > > zhanghao.c...@outlook.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jane,
> > > > > > > > >
> > > > > > > > > Thanks for the valuable suggestions.
> > > > > > > > >
> > > > > > > > > For Q1, it's indeed an issue. Some possible ideas include
> > > > > > introducing a
> > > > > > > > > fake transformation after the source that takes the global
> > > > default
> > > > > > > > > parallelism, or simply make exec nodes to take the global
> > > default
> > > > > > > > > parallelism, but both ways prevent potential chaining
> > > > opportunity and
> > > > > > > I'm
> > > > > > > > > not sure if that's good to go. We'll need to give deeper
> > > > thoughts in
> > > > > > it
> > > > > > > > and
> > > > > > > > > polish our proposal. We're also more than glad to hear your
> > > > inputs on
> > > > > > > it.
> > > > > > > > >
> > > > > > > > > For Q2, scan.parallelism will take high precedence, as the
> > more
> > > > > > > specific
> > > > > > > > > config should take higher precedence.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Zhanghao Chen
> > > > > > > > > ________________________________
> > > > > > > > > 发件人: Jane Chan <qingyue....@gmail.com>
> > > > > > > > > 发送时间: 2023年9月15日 11:56
> > > > > > > > > 收件人: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > > 抄送: dewe...@outlook.com <dewe...@outlook.com>
> > > > > > > > > 主题: Re: [DISCUSS] FLIP-367: Support Setting Parallelism for
> > > > Table/SQL
> > > > > > > > > Sources
> > > > > > > > >
> > > > > > > > > Hi, Zhanghao, Dewei,
> > > > > > > > >
> > > > > > > > > Thanks for initiating this discussion. This feature is
> > valuable
> > > > in
> > > > > > > > > providing more flexibility for performance tuning for SQL
> > > > pipelines.
> > > > > > > > >
> > > > > > > > > Here are my two cents,
> > > > > > > > >
> > > > > > > > > 1. In the FLIP, you mentioned concerns about the
> parallelism
> > of
> > > > the
> > > > > > > calc
> > > > > > > > > node and concluded to "leave the behavior unchanged for
> now."
> > > > This
> > > > > > > means
> > > > > > > > > that the calc node will use the parallelism of the source
> > > > operator,
> > > > > > > > > regardless of whether the source parallelism is configured
> or
> > > > not.
> > > > > > If I
> > > > > > > > > understand correctly, currently, except for the sink exec
> > node
> > > > (which
> > > > > > > has
> > > > > > > > > the ability to configure its own parallelism), the rest of
> > the
> > > > exec
> > > > > > > nodes
> > > > > > > > > accept its input parallelism. From the design, I didn't see
> > the
> > > > > > details
> > > > > > > > > about coping with input and default parallelism for the
> rest
> > of
> > > > the
> > > > > > > exec
> > > > > > > > > nodes. Can you elaborate more about the details?
> > > > > > > > >
> > > > > > > > > 2. Does the configuration
> > > > `table.exec.resource.default-parallelism`
> > > > > > > take
> > > > > > > > > precedence over `scan.parallelism`?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jane
> > > > > > > > >
> > > > > > > > > On Fri, Sep 15, 2023 at 10:43 AM Yun Tang <
> myas...@live.com>
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for creating this FLIP,
> > > > > > > > > >
> > > > > > > > > > Many users have demands to configure the source
> parallelism
> > > > just as
> > > > > > > > > > configuring the sink parallelism via DDL. Look forward
> for
> > > this
> > > > > > > > feature.
> > > > > > > > > >
> > > > > > > > > > BTW, I think setting parallelism for each operator should
> > > also
> > > > be
> > > > > > > > > > valuable. And this shall work with compiled plan [1]
> > instead
> > > of
> > > > > > SQL's
> > > > > > > > > DDL.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-292%3A+Enhance+COMPILED+PLAN+to+support+operator-level+state+TTL+configuration
> > > > > > > > > >
> > > > > > > > > > Best
> > > > > > > > > > Yun Tang
> > > > > > > > > > ________________________________
> > > > > > > > > > From: Benchao Li <libenc...@apache.org>
> > > > > > > > > > Sent: Thursday, September 14, 2023 19:53
> > > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > > > Cc: dewe...@outlook.com <dewe...@outlook.com>
> > > > > > > > > > Subject: Re: [DISCUSS] FLIP-367: Support Setting
> > Parallelism
> > > > for
> > > > > > > > > Table/SQL
> > > > > > > > > > Sources
> > > > > > > > > >
> > > > > > > > > > Thanks Zhanghao, Dewei for preparing the FLIP,
> > > > > > > > > >
> > > > > > > > > > I think this is a long awaited feature, and I appreciate
> > your
> > > > > > effort,
> > > > > > > > > > especially the "Other concerns" part you listed.
> > > > > > > > > >
> > > > > > > > > > Regarding the parallelism of transformations following
> the
> > > > source
> > > > > > > > > > transformation, it's indeed a problem that we initially
> > want
> > > to
> > > > > > solve
> > > > > > > > > > when we introduced this feature internally. I'd like to
> > hear
> > > > more
> > > > > > > > > > opinions on this. Personally I'm ok to leave it out of
> this
> > > > FLIP
> > > > > > for
> > > > > > > > > > the time being.
> > > > > > > > > >
> > > > > > > > > > Chen Zhanghao <zhanghao.c...@outlook.com> 于2023年9月14日周四
> > > > 14:46写道:
> > > > > > > > > > >
> > > > > > > > > > > Hi Devs,
> > > > > > > > > > >
> > > > > > > > > > > Dewei (cced) and I would like to start a discussion on
> > > > FLIP-367:
> > > > > > > > > Support
> > > > > > > > > > Setting Parallelism for Table/SQL Sources [1].
> > > > > > > > > > >
> > > > > > > > > > > Currently, Flink Table/SQL jobs do not expose
> > fine-grained
> > > > > > control
> > > > > > > of
> > > > > > > > > > operator parallelism to users. FLIP-146 [2] brings us
> > support
> > > > for
> > > > > > > > setting
> > > > > > > > > > parallelism for sinks, but except for that, one can only
> > set
> > > a
> > > > > > > default
> > > > > > > > > > global parallelism and all other operators share the same
> > > > > > > parallelism.
> > > > > > > > > > However, in many cases, setting parallelism for sources
> > > > > > individually
> > > > > > > is
> > > > > > > > > > preferable:
> > > > > > > > > > >
> > > > > > > > > > > - Many connectors have an upper bound parallelism to
> > > > efficiently
> > > > > > > > ingest
> > > > > > > > > > data. For example, the parallelism of a Kafka source is
> > bound
> > > > by
> > > > > > the
> > > > > > > > > number
> > > > > > > > > > of partitions, any extra tasks would be idle.
> > > > > > > > > > > - Other operators may involve intensive computation and
> > > need
> > > > a
> > > > > > > larger
> > > > > > > > > > parallelism.
> > > > > > > > > > >
> > > > > > > > > > > We propose to improve the current situation by
> extending
> > > the
> > > > > > > current
> > > > > > > > > > table source API to support setting parallelism for
> > Table/SQL
> > > > > > sources
> > > > > > > > via
> > > > > > > > > > connector options.
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > >
> > > > > > > > > > > [1] FLIP-367: Support Setting Parallelism for Table/SQL
> > > > Sources -
> > > > > > > > > Apache
> > > > > > > > > > Flink - Apache Software Foundation<
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150
> > > > > > > > > > >
> > > > > > > > > > > [2] FLIP-146: Improve new TableSource and TableSink
> > > > interfaces -
> > > > > > > > Apache
> > > > > > > > > > Flink - Apache Software Foundation<
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Zhanghao Chen
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Benchao Li
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Best
> > > > > > >
> > > > > > > ConradJam
> > > > > > >
> > > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
>

Reply via email to