Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-21 Thread Dong Lin
Hi Piotr,

Thanks again for proposing the isProcessingBacklog concept.

After discussing with Becket Qin and thinking about this more, I agree it
is a better idea to add a top-level concept to all source operators to
address the target use-case.

The main reason that changed my mind is that isProcessingBacklog can be
described as an inherent/nature attribute of every source instance and its
semantics does not need to depend on any specific checkpointing policy.
Also, we can hardcode the isProcessingBacklog behavior for the sources we
have considered so far (e.g. HybridSource and MySQL CDC source) without
asking users to explicitly configure the per-source behavior, which indeed
provides better user experience.

I have updated the FLIP based on the latest suggestions. The latest FLIP no
longer introduces per-source config that can be used by end-users. While I
agree with you that CheckpointTrigger can be a useful feature to address
additional use-cases, I am not sure it is necessary for the use-case
targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
in another FLIP?

Can you help take another look at the updated FLIP?

Best,
Dong



On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
wrote:

> Hi Dong,
>
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
> any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
>
> Sorry I don't understand where you are getting those numbers from.
> Instead of trying to find loophole after loophole, could you try to think
> how a given loophole could be improved/solved?
>
> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
>
> Please propose something. I don't think it's needed.
>
> > - For the use-case mentioned in FLIP-309 motivation section, would the
> APIs
> > of this alternative approach be more or less usable?
>
> Everything that you originally wanted to achieve in FLIP-309, you could do
> as well in my proposal.
> Vide my many mentions of the "hacky solution".
>
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
>
> I don't see why not.
>
> > - Can these APIs be decoupled from the APIs currently proposed in
> FLIP-309?
>
> Yes
>
> > For example, if the APIs of this alternative approach can be decoupled
> from
> > the APIs currently proposed in FLIP-309, then it might be reasonable to
> > work on this extra use-case with a more advanced/complicated design
> > separately in a followup work.
>
> As I voiced my concerns previously, the current design of FLIP-309 would
> clog the public API and in the long run confuse the users. IMO It's
> addressing the
> problem in the wrong place.
>
> > Hmm.. do you mean we can do the following:
> > - Have all source operators emit a metric named "processingBacklog".
> > - Add a job-level config that specifies "the checkpointing interval to be
> > used when any source is processing backlog".
> > - The JM collects the "processingBacklog" periodically from all source
> > operators and uses the newly added config value as appropriate.
>
> Yes.
>
> > The challenge with this approach is that we need to define the semantics
> of
> > this "processingBacklog" metric and have all source operators
> > implement this metric. I am not sure we are able to do this yet without
> > having users explicitly provide this information on a per-source basis.
> >
> > Suppose the job read from a bounded Kafka source, should it emit
> > "processingBacklog=true"? If yes, then the job might use long
> checkpointing
> > interval even
> > if the job is asked to process data starting from now to the next 1 hour.
> > If no, then the job might use the short checkpointing interval
> > even if the job is asked to re-process data starting from 7 days ago.
>
> Yes. The same can be said of your proposal. Your proposal has the very same
> issues
> that every source would have to implement it differently, most sources
> would
> have no idea how to properly calculate the new requested checkpoint
> interval,
> for those that do know how to do that, user would have to configure every
> source
> individually and yet again we would end up with a system, that works only
> partially in
> some special use cases (HybridSource), that's confusing the users even
> more.
>
> That's why I think the more generic solution, working primarily on the same
> metrics that are used by various auto scaling solutions (like Flink K8s
> operator's
> autosaler

[jira] [Created] (FLINK-32411) SourceCoordinator thread leaks when job recovers from checkpoint

2023-06-21 Thread Rui Fan (Jira)
Rui Fan created FLINK-32411:
---

 Summary: SourceCoordinator thread leaks when job recovers from 
checkpoint
 Key: FLINK-32411
 URL: https://issues.apache.org/jira/browse/FLINK-32411
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.17.1, 1.16.2
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-06-22-11-00-25-446.png, 
image-2023-06-22-11-12-35-747.png

SourceCoordinator thread leaks when job recovers from checkpoint, from the 
following figure, we can see:
 * 2 SourceCoordinator thread for slow SlowNumberSequenceSource
 * 2 SourceCoordinator thread for slow FastNumberSequenceSource 

!image-2023-06-22-11-12-35-747.png|width=889,height=225!
h1. Root cause:
 # When initialize the ExecutionJobVertex of source, 
RecreateOnResetOperatorCoordinator will create the SourceCoordinator. [code 
link|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L60]
 # When job recovers from checkpoint,  
[RecreateOnResetOperatorCoordinator#resetToCheckpoint|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L120]
 will close the old coordinator, and create a new coordinator. 
 # The 
[SourceCoordinator#close|https://github.com/apache/flink/blob/50952050057b1655e6a81e844cefa377db66d277/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L271]
 just close the SourceCoordinatorContext after coordinator is started, so the 
SourceCoordinatorContext of old coordinator won't be closed.
 # The SourceCoordinatorContext create some threads in its constructor, so it 
should be closed even if the SourceCoordinator isn't started.

 

The call stack about create SourceCoordinator:
{code:java}
// Create the first SourceCoordinator
"jobmanager-io-thread-1@6168" daemon prio=5 tid=0x44 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:142)
      at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:339)
      - locked <0x1f02> (a 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:60)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:43)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:202)
      at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:196)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:534)
      at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:497)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
      at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:912)
      at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:218)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:894)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:850)
      at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:207)
      at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:163)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:366)
      at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:210)
      at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:140)
      at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(De

Re: [DISCUSS] Deprecate SourceFunction APIs

2023-06-21 Thread Alexander Fedulov
I'd like to revive the efforts to deprecate the SourceFunction API.

It would be great to get a review for this PR:
https://github.com/apache/flink/pull/21774

It immediately unblocks marking the actual SourceFunction as deprecated.
https://github.com/apache/flink/pull/20049

There is also this work thread related
to StreamExecutionEnvironment#fromCollection() methods.
The discussion seem to have stalled:
https://github.com/apache/flink/pull/21028

Thanks,
Alex

On 2022/06/15 19:30:31 Alexander Fedulov wrote:
> Thank you all for your valuable input and participation in the discussion
>
> The vote is open now [1]
>
> [1] https://lists.apache.org/thread/kv9rj3w2rmkb8jtss5bqffhw57or7v8v
>
> Best,
> Alexander Fedulov


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-21 Thread Lincoln Lee
Hi Aitozi,

Thanks for your updates!

By the design of hints, the hints after select clause belong to the query
hints category, and this new hint is also a kind of join hints[1].
Join table function is one of the join type defined by flink sql joins[2],
all existing join hints[1] omit the 'join' keyword,
so I would prefer the 'ASYNC_TABLE_FUNC' (which is actually the one for
'ASYNC_TABLE_FUNC_JOIN').

Since a short Chinese holiday is coming, I suggest waiting for other
people's responses before continuing to vote (next monday?)

Btw, I discussed with @fudian offline about pyflink support, there should
be no known issues, so you can create a subtask with pyflink support after
the vote passed.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#join-hints
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/

Best,
Lincoln Lee


Aitozi  于2023年6月18日周日 21:18写道:

> Hi all,
> Sorry for the late reply, I have a discussion with Lincoln offline,
> mainly about
> the naming of the hints option. Thanks Lincoln for the valuable
> suggestions.
>
> Let me answer the last email inline.
>
> >For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call as
> an example?
>
> Sure, will give an example when adding the doc of async udtf and will
> update the FLIP simultaneously
>
> >For the name of this query hint, 'LATERAL' (include its internal options)
> don't show any relevance to async, but I haven't thought of a suitable name
> at the moment,
>
> After some discussion with Lincoln, We prefer to choose one of the
> `ASYNC_TABLE_FUNC` and `ASYNC_LATERAL`.
> Besides, In my opinion the keyword `lateral`'s use scenario is wider than
> the table function join, but in this case we only want to config
> the async table function, So I'm a bit more lean to the `ASYNC_TABLE_FUNC`.
> Looking forward to some inputs if you guys have
> some better suggestion on the naming.
>
> For the usage of the hints config option, I have updated the section
> of ConfigOption, you can refer to the FLIP
> for more details.
>
> >Also, the terms 'correlate join' and 'lateral join' are not the same as in
> the current joins page[1], so maybe it would be better if we unified them
> into  'join table function'
>
> Yes, we should unified to the 'join table function', updated.
>
> Best,
> Aitozi
>
> Lincoln Lee  于2023年6月15日周四 09:15写道:
>
> > Hi Aitozi,
> >
> > Thanks for your reply!  Gives sql users more flexibility to get
> > asynchronous processing capabilities via lateral join table function +1
> for
> > this
> >
> > For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call
> as
> > an example?
> >
> > For the name of this query hint, 'LATERAL' (include its internal options)
> > don't show any relevance to async, but I haven't thought of a suitable
> name
> > at the moment,
> > maybe we need to highlight the async keyword directly, we can also see if
> > others have better candidates
> >
> > For the hint option "timeout = '180s'" should be "'timeout' = '180s'",
> > seems a typo in the flip. And use upper case for all keywords in sql
> > examples.
> > Also, the terms 'correlate join' and 'lateral join' are not the same as
> in
> > the current joins page[1], so maybe it would be better if we unified them
> > into  'join table function'
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#table-function
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Aitozi  于2023年6月14日周三 16:11写道:
> >
> > > Hi Lincoln
> > >
> > > Very thanks for your valuable question. I will try to answer your
> > > questions inline.
> > >
> > > >Does the async udtf bring any additional benefits besides a
> > > lighter implementation?
> > >
> > > IMO, async udtf is more than a lighter implementation. It can act as a
> > > general way for sql users to use the async operator. And they don't
> have
> > to
> > > bind the async function with a table (a LookupTable), and they are not
> > > forced to join on an equality join condition, and they can use it to do
> > > more than enrich data.
> > >
> > > The async lookup join is more like a subset/specific usage of async
> udtf.
> > > The specific version has more opportunity to be optimized (like push
> > down)
> > > is acceptable. Async table function should be categorized to
> used-defined
> > > function.
> > >
> > > >Should users
> > >
> > > migrate to the lookup source when they encounter similar requirements
> or
> > >
> > > problems, or should we develop an additional set of similar mechanisms?
> > >
> > > As I clarified above, the lookup join is a specific usage of async
> udtf.
> > So
> > > it deserves more refined optimization like caching / retryable. But it
> > may
> > > not all
> > >
> > > suitable for the async udtf. As function, it can be deterministic/or
> > > non-deterministic. So caching is not suitable, and we also do not have
> a
> > > common cache for the udf now. So I thi

[jira] [Created] (FLINK-32410) Allocate hash-based collections with sufficient capacity for expected size

2023-06-21 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-32410:
--

 Summary: Allocate hash-based collections with sufficient capacity 
for expected size
 Key: FLINK-32410
 URL: https://issues.apache.org/jira/browse/FLINK-32410
 Project: Flink
  Issue Type: Improvement
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.19.0


The JDK API to create hash-based collections for a certain capacity is arguable 
misleading because it doesn't size the collections to "hold a specific number 
of items" like you'd expect it would. Instead it sizes it to hold load-factor% 
of the specified number.

For the common pattern to allocate a hash-based collection with the size of 
expected elements to avoid rehashes, this means that a rehash is essentially 
guaranteed.

We should introduce helper methods (similar to Guava's 
`Maps.newHashMapWithExpectedSize(int)`) for allocations for expected size and 
replace  the direct constructor calls with those.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32409) Remove MultipleComponentLeaderElectionDriverAdapter

2023-06-21 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32409:
-

 Summary: Remove MultipleComponentLeaderElectionDriverAdapter
 Key: FLINK-32409
 URL: https://issues.apache.org/jira/browse/FLINK-32409
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-32408:
---

 Summary: JobManager HA configuration update needed in Flink k8s 
Operator 
 Key: FLINK-32408
 URL: https://issues.apache.org/jira/browse/FLINK-32408
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: dongwoo.kim
 Fix For: kubernetes-operator-1.6.0


In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work ** but it didn't{*}.{*}


ref: 
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-287: Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-21 Thread Joao Boto
Hi all, I am happy to announce that FLIP-287: Extend Sink#InitContext to
expose TypeSerializer, ObjectReuse and JobID[1] has been accepted. There
are 8 approving votes, 6 of which are binding: - Lijie Wang (binding)
- Jing Ge (binding)
- Tzu-Li (Gordon) Tai (binding)
- Zhu Zhu (binding)
- Yuepeng Pan
- Martijn Visser (binding)
- Leonard Xu (binding)
- John Roesler

There are no disapproving votes.

Thanks everyone for participating! [1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
  Best Regards, João Boto


[DISCUSS] Releasing Flink ML 2.3.0

2023-06-21 Thread Xin Jiang
Hi devs,

Dong and I would like to start a discussion regarding the release of Flink ML 
2.3.0. 

The release mainly provides the ability to run on multiple Flink versions. With 
2.3.0, users can build and run Flink ML libraries with Flink 1.15, 1.16, and 
1.17.
Besides, some algorithm performence optimizations like Swing are also included.

With the above improvements, we believe it is time to release Flink ML 2.3.0.

Dong would be the release manager and I will assist him. Please feel free to 
provide your feedback.


Best Regards,
Xin

Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-06-21 Thread Leonard Xu
Thanks Samrat for driving this FLIP.

Since the community has already built a set of basic components for the 
connector, I only have three comments.

1 Reusing the capabilities of JDBC and Filesystem in the Redshift connector 
generally makes sense to me. However, since all of them are managed in 
different repositories and depend on Flink dependency, could you explain how 
you establish the versioning, release, and dependency management process?

2 Some configuration option names can be improved to match the naming style of 
existing configuration options, for example:
table -> table-name
query -> scan.query
aws-iam-role -> aws.iam-role
read.mode -> scan.read.mode: similar to scan.startup.mode , and maybe we will 
have lookup.read.mode 
write.mode -> sink.write.mode

3 The source of Redshift connector supports JDBC queries, IIUC, we can also 
support the LookupTableSource as well?

Best,
Leonard

> On Jun 21, 2023, at 4:57 PM, Samrat Deb  wrote:
> 
> Hi Martijn,
> 
> Thank you for sharing your thoughts on the matter.
> I understand that you don't have a strong opinion on whether to support
> exactly-once processing from the beginning or at a later stage.
> For initial implementation I will go ahead with at-least-once semantics.
> 
>> The only consideration that I could think of is that
> if you start with at-least-once, you could consider using the ASync API,
> but I don't think the ASync API yet supports exactly-once.
> 
> Noted. It's a valid consideration to start compatibility with the Async
> API.
> 
> Bests,
> Samrat
> 
> 
> On Mon, Jun 19, 2023 at 5:28 PM Martijn Visser 
> wrote:
> 
>> Hi Samrat,
>> 
>> I have no strong opinion on whether to support exactly-once from the start
>> or potentially later. The only consideration that I could think of is that
>> if you start with at-least-once, you could consider using the ASync API,
>> but I don't think the ASync API yet supports exactly-once.
>> 
>> Thanks,
>> 
>> Martijn
>> 
>> On Fri, Jun 9, 2023 at 7:22 PM Jing Ge  wrote:
>> 
>>> Hi Samrat,
>>> 
>>> The FLIP looks good, thanks!
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> 
>>> On Tue, Jun 6, 2023 at 8:16 PM Samrat Deb  wrote:
>>> 
 Hi Jing,
 
> I would suggest adding that information into the
 FLIP.
 
 Updated now, please review the new version of flip whenever time.
 
> +1 Looking forward to your PR :-)
 I will request for your review once m ready with PR :-)
 
 Bests,
 Samrat
 
 On Tue, Jun 6, 2023 at 11:43 PM Samrat Deb 
>>> wrote:
 
> Hi Martijn,
> 
>> If I understand this correctly, the Redshift sink
> would not be able to support exactly-once, is that correct?
> 
> As I delve deeper into the study of Redshift's capabilities, I have
> discovered that it does support "merge into" operations [1] and some
> merge into examples [2].
> This opens up the possibility of implementing exactly-once semantics
>>> with
> the connector.
> However, I believe it would be prudent to start with a more focused
>>> scope
> for the initial phase of implementation and defer the exact-once
>>> support
> for subsequent iterations.
> 
> Before finalizing the approach, I would greatly appreciate your
>>> thoughts
> and suggestions on this matter.
> Should we prioritize the initial implementation without exactly-once
> support, or would you advise incorporating it right from the start?
> Your insights and experiences would be immensely valuable in making
>>> this
> decision.
> 
> 
> [1]
> 
 
>>> 
>> https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html
> [2]
>> https://docs.aws.amazon.com/redshift/latest/dg/merge-examples.html
> 
> Bests,
> Samrat
> 
> On Mon, Jun 5, 2023 at 7:09 PM Jing Ge 
 wrote:
> 
>> Hi Samrat,
>> 
>> Thanks for the feedback. I would suggest adding that information
>> into
 the
>> FLIP.
>> 
>> +1 Looking forward to your PR :-)
>> 
>> Best regards,
>> Jing
>> 
>> On Sat, Jun 3, 2023 at 9:19 PM Samrat Deb 
 wrote:
>> 
>>> Hi Jing Ge,
>>> 
>> Do you already have any prototype? I'd like to join the
>> reviews.
>>> The prototype is in progress. I will raise the dedicated PR for
>>> review
>> soon
>>> also notify in this thread as well .
>>> 
>> Will the Redshift connector provide additional features
>>> beyond the mediator/wrapper of the jdbc connector?
>>> 
>>> Here are the additional features that the Flink connector for AWS
>> Redshift
>>> can provide on top of using JDBC:
>>> 
>>> 1. Integration with AWS Redshift Workload Management (WLM): AWS
 Redshift
>>> allows you to configure WLM[1] to manage query prioritization and
>> resource
>>> allocation. The Flink connector for Redshift will be agnostic to
>> the
>>> configured WLM and ut

Re: [VOTE] FLIP-246: Dynamic Kafka Source (originally Multi Cluster Kafka Source)

2023-06-21 Thread Thomas Weise
+1 (binding)


On Mon, Jun 19, 2023 at 8:09 AM Ryan van Huuksloot
 wrote:

> +1 (non-binding)
>
> +1 for DynamicKafkaSource
>
> Ryan van Huuksloot
> Sr. Production Engineer | Streaming Platform
> [image: Shopify]
> 
>
>
> On Mon, Jun 19, 2023 at 8:15 AM Martijn Visser 
> wrote:
>
> > +1 (binding)
> >
> > +1 for DynamicKafkaSource
> >
> >
> > On Sat, Jun 17, 2023 at 5:31 AM Tzu-Li (Gordon) Tai  >
> > wrote:
> >
> > > +1 (binding)
> > >
> > > +1 for either DynamicKafkaSource or DiscoveringKafkaSource
> > >
> > > Cheers,
> > > Gordon
> > >
> > > On Thu, Jun 15, 2023, 10:56 Mason Chen  wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thank you to everyone for the feedback on FLIP-246 [1]. Based on the
> > > > discussion thread [2], we have come to a consensus on the design and
> > are
> > > > ready to take a vote to contribute this to Flink.
> > > >
> > > > This voting thread will be open for at least 72 hours (excluding
> > > weekends,
> > > > until June 20th 10:00AM PST) unless there is an objection or an
> > > > insufficient number of votes.
> > > >
> > > > (Optional) If you have an opinion on the naming of the connector,
> > please
> > > > include it in your vote:
> > > > 1. DynamicKafkaSource
> > > > 2. MultiClusterKafkaSource
> > > > 3. DiscoveringKafkaSource
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
> > > > [2] https://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z
> > > >
> > > > Best,
> > > > Mason
> > > >
> > >
> >
>


[jira] [Created] (FLINK-32407) Notify catalog listener for table events

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32407:
-

 Summary: Notify catalog listener for table events
 Key: FLINK-32407
 URL: https://issues.apache.org/jira/browse/FLINK-32407
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32406) Notify catalog listener for database events

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32406:
-

 Summary: Notify catalog listener for database events
 Key: FLINK-32406
 URL: https://issues.apache.org/jira/browse/FLINK-32406
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32405) Initialize catalog listener for CatalogManager

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32405:
-

 Summary: Initialize catalog listener for CatalogManager
 Key: FLINK-32405
 URL: https://issues.apache.org/jira/browse/FLINK-32405
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32404) Introduce catalog modification listener and factory interfaces

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32404:
-

 Summary: Introduce catalog modification listener and factory 
interfaces
 Key: FLINK-32404
 URL: https://issues.apache.org/jira/browse/FLINK-32404
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32403) Add database related operations in catalog manager

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32403:
-

 Summary: Add database related operations in catalog manager
 Key: FLINK-32403
 URL: https://issues.apache.org/jira/browse/FLINK-32403
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Fang Yong


Add database operations in catalog manager for different sql operations



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32402) FLIP-294: Support Customized Catalog Modification Listener

2023-06-21 Thread Fang Yong (Jira)
Fang Yong created FLINK-32402:
-

 Summary: FLIP-294: Support Customized Catalog Modification Listener
 Key: FLINK-32402
 URL: https://issues.apache.org/jira/browse/FLINK-32402
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Affects Versions: 1.18.0
Reporter: Fang Yong


Issue for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-21 Thread Jing Ge
Hi Ron,

Thanks for sharing your thoughts! It makes sense. It would be helpful if
these references of Hive, Polardb, etc. could be added into the FLIP.

Best regards,
Jing

On Tue, Jun 20, 2023 at 5:41 PM liu ron  wrote:

> Hi, Jing
>
> The default value for this ratio is a reference to other systems, such as
> Hive. As long as Runtime Filter can filter out more than half of the data,
> we can benefit from it. Of course, normally, as long as we can get the
> statistics, ndv are present, the use of rowCount should be less, so I think
> the formula is valid in most cases. This formula we are also borrowed from
> some systems, such as the polardb of AliCloud. your concern is valuable for
> this FLIP, but currently, we do not know how to adjust is reasonably, too
> complex may lead to the user also can not understand, so I think we should
> first follow the simple way, the subsequent gradual optimization. The first
> step may be that we can verify the reasonableness of current formula by
> TPC-DS case.
>
> Best,
> Ron
>
> Jing Ge  于2023年6月20日周二 19:46写道:
>
> > Hi Ron,
> >
> > Thanks for the clarification. That answered my questions.
> >
> > Regarding the ratio, since my gut feeling is that any value less than 0.8
> > or 0.9 won't help too much(I might be wrong). I was thinking of adapting
> > the formula to somehow map the current 0.9-1 to 0-1, i.e. if user config
> > 0.5, it will be mapped to e.g. 0.95 (or e.g. 0.85, the real number
> > needs more calculation) for the current formula described in the FLIP.
> But
> > I am not sure it is a feasible solution. It deserves more discussion.
> Maybe
> > some real performance tests could give us some hints.
> >
> > Best regards,
> > Jing
> >
> > On Tue, Jun 20, 2023 at 5:19 AM liu ron  wrote:
> >
> > > Hi, Jing
> > >
> > > Thanks for your feedback.
> > >
> > > > Afaiu, the runtime Filter will only be Injected when the gap between
> > the
> > > build data size and prob data size is big enough. Let's make an extreme
> > > example. If the small table(build side) has one row and the large
> > > table(probe side) contains tens of billions of rows. This will be the
> > ideal
> > > use case for the runtime filter and the improvement will be
> significant.
> > Is
> > > this correct?
> > >
> > > Yes, you are right.
> > >
> > > > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> > > will
> > > the value of max-build-data-size and min-prob-data-size depend on the
> > > parallelism config? I.e. with the same data-size setting, is it
> possible
> > to
> > > inject or don't inject runtime filters by adjusting the parallelism?
> > >
> > > First, let me clarify two points. The first is that RuntimeFilter
> decides
> > > whether to inject or not in the optimization phase, but we do not
> > consider
> > > operator parallelism in the SQL optimization phase currently, which is
> > set
> > > at the ExecNode level. The second is that in batch mode, the default
> > > AdaptiveBatchScheduler[1] is now used, which will derive the
> parallelism
> > of
> > > the downstream operator based on the amount of data produced by the
> > > upstream operator, that is, the parallelism is determined by runtime
> > > adaptation. In the above case, we cannot decide whether to inject
> > > BloomFilter in the optimization stage based on parallelism.
> > > A more important point is that the purpose of Runtime Filter is to
> reduce
> > > the amount of data for shuffle, and thus the amount of data processed
> by
> > > the downstream join operator. Therefore, I understand that regardless
> of
> > > the parallelism of the probe, the amount of data in the shuffle must be
> > > reduced after inserting the Runtime Filter, which is beneficial to the
> > join
> > > operator, so whether to insert the RuntimeFilter or not is not
> dependent
> > on
> > > the parallelism.
> > >
> > > > Does it make sense to reconsider the formula of ratio
> > > calculation to help users easily control the filter injection?
> > >
> > > Only when ndv does not exist will row count be considered. when size
> uses
> > > the default value and ndv cannot be taken, it is true that this
> condition
> > > may always hold, but this does not seem to affect anything, and the
> user
> > is
> > > also likely to change the value of the size. One question, how do you
> > think
> > > we should make it easier for users to control the  filter injection?
> > >
> > >
> > > [1]:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> > >
> > > Best,
> > > Ron
> > >
> > > Jing Ge  于2023年6月20日周二 07:11写道:
> > >
> > > > Hi Lijie,
> > > >
> > > > Thanks for your proposal. It is a really nice feature. I'd like to
> ask
> > a
> > > > few questions to understand your thoughts.
> > > >
> > > > Afaiu, the runtime Filter will only be Injected when the gap between
> > the
> > > > build data size and prob data size is big enough. Let's make an
> extreme
> > > > example. If the smal

Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-06-21 Thread Samrat Deb
Hi Martijn,

Thank you for sharing your thoughts on the matter.
I understand that you don't have a strong opinion on whether to support
exactly-once processing from the beginning or at a later stage.
For initial implementation I will go ahead with at-least-once semantics.

>  The only consideration that I could think of is that
if you start with at-least-once, you could consider using the ASync API,
but I don't think the ASync API yet supports exactly-once.

Noted. It's a valid consideration to start compatibility with the Async
API.

Bests,
Samrat


On Mon, Jun 19, 2023 at 5:28 PM Martijn Visser 
wrote:

> Hi Samrat,
>
> I have no strong opinion on whether to support exactly-once from the start
> or potentially later. The only consideration that I could think of is that
> if you start with at-least-once, you could consider using the ASync API,
> but I don't think the ASync API yet supports exactly-once.
>
> Thanks,
>
> Martijn
>
> On Fri, Jun 9, 2023 at 7:22 PM Jing Ge  wrote:
>
> > Hi Samrat,
> >
> > The FLIP looks good, thanks!
> >
> > Best regards,
> > Jing
> >
> >
> > On Tue, Jun 6, 2023 at 8:16 PM Samrat Deb  wrote:
> >
> > > Hi Jing,
> > >
> > > >  I would suggest adding that information into the
> > > FLIP.
> > >
> > > Updated now, please review the new version of flip whenever time.
> > >
> > > > +1 Looking forward to your PR :-)
> > > I will request for your review once m ready with PR :-)
> > >
> > > Bests,
> > > Samrat
> > >
> > > On Tue, Jun 6, 2023 at 11:43 PM Samrat Deb 
> > wrote:
> > >
> > > > Hi Martijn,
> > > >
> > > > > If I understand this correctly, the Redshift sink
> > > > would not be able to support exactly-once, is that correct?
> > > >
> > > > As I delve deeper into the study of Redshift's capabilities, I have
> > > > discovered that it does support "merge into" operations [1] and some
> > > > merge into examples [2].
> > > > This opens up the possibility of implementing exactly-once semantics
> > with
> > > > the connector.
> > > > However, I believe it would be prudent to start with a more focused
> > scope
> > > > for the initial phase of implementation and defer the exact-once
> > support
> > > > for subsequent iterations.
> > > >
> > > > Before finalizing the approach, I would greatly appreciate your
> > thoughts
> > > > and suggestions on this matter.
> > > > Should we prioritize the initial implementation without exactly-once
> > > > support, or would you advise incorporating it right from the start?
> > > > Your insights and experiences would be immensely valuable in making
> > this
> > > > decision.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html
> > > > [2]
> https://docs.aws.amazon.com/redshift/latest/dg/merge-examples.html
> > > >
> > > > Bests,
> > > > Samrat
> > > >
> > > > On Mon, Jun 5, 2023 at 7:09 PM Jing Ge 
> > > wrote:
> > > >
> > > >> Hi Samrat,
> > > >>
> > > >> Thanks for the feedback. I would suggest adding that information
> into
> > > the
> > > >> FLIP.
> > > >>
> > > >> +1 Looking forward to your PR :-)
> > > >>
> > > >> Best regards,
> > > >> Jing
> > > >>
> > > >> On Sat, Jun 3, 2023 at 9:19 PM Samrat Deb 
> > > wrote:
> > > >>
> > > >> > Hi Jing Ge,
> > > >> >
> > > >> > >>> Do you already have any prototype? I'd like to join the
> reviews.
> > > >> > The prototype is in progress. I will raise the dedicated PR for
> > review
> > > >> soon
> > > >> > also notify in this thread as well .
> > > >> >
> > > >> > >>> Will the Redshift connector provide additional features
> > > >> > beyond the mediator/wrapper of the jdbc connector?
> > > >> >
> > > >> > Here are the additional features that the Flink connector for AWS
> > > >> Redshift
> > > >> > can provide on top of using JDBC:
> > > >> >
> > > >> > 1. Integration with AWS Redshift Workload Management (WLM): AWS
> > > Redshift
> > > >> > allows you to configure WLM[1] to manage query prioritization and
> > > >> resource
> > > >> > allocation. The Flink connector for Redshift will be agnostic to
> the
> > > >> > configured WLM and utilize it for scaling in and out for the sink.
> > > This
> > > >> > means that the connector can leverage the WLM capabilities of
> > Redshift
> > > >> to
> > > >> > optimize the execution of queries and allocate resources
> efficiently
> > > >> based
> > > >> > on your defined workload priorities.
> > > >> >
> > > >> > 2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift
> > imposes
> > > >> > certain quotas and limits[2] on various aspects such as the number
> > of
> > > >> > clusters, concurrent connections, queries per second, etc. The
> Flink
> > > >> > connector for Redshift will provide an abstraction layer for
> users,
> > > >> > allowing them to work with Redshift without having to worry about
> > > these
> > > >> > specific limits. The connector will handle the management of
> > > connections
> > > >> > and queries within the defined quotas and limits, abstract