Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-12-17 Thread Jiabao Sun
Hi Becket,

The MySQL connector is currently in the flink-connector-jdbc repository and is 
not a standalone connector. 
Is it too unique to use "mysql" as the configuration option prefix? 

Also, I would like to ask about the difference in behavior between AUTO and 
ALWAYS. 
It seems that we cannot guarantee the pushing down of all filters to the 
external system under the ALWAYS 
mode because not all filters in Flink SQL are supported by the external system.
Should we throw an error when encountering a filter that cannot be pushed down 
in the ALWAYS mode?

Thanks,
Jiabao

> 2023年12月18日 15:34,Becket Qin  写道:
> 
> Hi JIabao,
> 
> Thanks for updating the FLIP. Maybe I did not explain it clearly enough. My
> point is that given there are various good flavors of behaviors handling
> filters pushed down, we should not have a common config of
> "ignore.filter.pushdown", because the behavior is not *common*.
> 
> It looks like the original motivation of this FLIP is just for MySql. Let's
> focus on what is the best solution for MySql connector here first. After
> that, if people think the best behavior for MySql happens to be a common
> one, we can then discuss whether that is worth being added to the base
> implementation of source. For MySQL, if we are going to introduce a config
> to MySql, why not have something like "mysql.filter.handling.policy" with
> value of AUTO / NEVER / ALWAYS? Isn't that better than
> "ignore.filter.pushdown"?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> On Sun, Dec 17, 2023 at 11:30 PM Jiabao Sun 
> wrote:
> 
>> Hi Becket,
>> 
>> The FLIP document has been updated as well.
>> Please take a look when you have time.
>> 
>> Thanks,
>> Jiabao
>> 
>> 
>>> 2023年12月17日 22:54,Jiabao Sun  写道:
>>> 
>>> Thanks Becket,
>>> 
>>> I apologize for not being able to continue with this proposal due to
>> being too busy during this period.
>>> 
>>> The viewpoints you shared about the design of Flink Source make sense to
>> me
>>> The native configuration ‘ignore.filter.pushdown’ is good to me.
>>> Having a unified name or naming style can indeed prevent confusion for
>> users regarding
>>> the inconsistent naming of this configuration across different
>> connectors.
>>> 
>>> Currently, there are not many external connectors that support filter
>> pushdown.
>>> I propose that we first introduce it in flink-connector-jdbc and
>> flink-connector-mongodb.
>>> Do you think this is feasible?
>>> 
>>> Best,
>>> Jiabao
>>> 
>>> 
 2023年11月16日 17:45,Becket Qin  写道:
 
 Hi Jiabao,
 
 Arguments like "because Spark has it so Flink should also have it" does
>> not
 make sense. Different projects have different API flavors and styles.
>> What
 is really important is the rationale and the design principle behind the
 API. They should conform to the convention of the project.
 
 First of all, Spark Source API itself has a few issues and they ended up
 introduce DataSource V2 in Spark 3.0, which added the decorative
>> interfaces
 like SupportsPushdownXXX. Some of the configurations predating
>> DataSource
 V2 may still be there.
 
 For the Spark configurations you mentioned, they are all the
>> configurations
 for FileScanBuilder, which is equivalent to FileSource in Flink.
>> Currently,
 regardless of the format (ORC, Parquet, Avro, etc), the FileSource
>> pushes
 back all the filters to ensure correctness. The actual filters that got
 applied to the specific format might still be different. This
 implementation is the same in FileScanBuilder.pushFilters() for Spark. I
 don't know why Spark got separate configurations for each format. Maybe
>> it
 is because the filters are actually implemented differently for
>> different
 format.
 
 At least for the current implementation in FileScanBuilder, these
 configurations can be merged to one configuration like
 `apply.filters.to.format.enabled`. Note that this config, as well as the
 separate configs you mentioned, are just visible and used by the
 FileScanBuilder. It determines whether the filters should be passed
>> down to
 the format of the FileScanBuilder instance. Regardless of the value of
 these configs, FileScanBuilder.pushFilters() will always be called, and
 FileScanBuilder (as well as FileSource in Flink) will always push back
>> all
 the filters to the framework.
 
 A MySql source can have a very different way to handle this. For
>> example, A
 MySql source  A config in this case might be "my.apply.filters" with
>> three
 different values:
 - AUTO: The Source will issue a DESC Table request to understand
>> whether a
 filter can be applied efficiently. And decide which filters can be
>> applied
 and which cannot based on that.
 - NEVER: Never apply filtering. It will always do a full table read and
 let Flink do the filtering.
 - ALWAYS: Always apply the filtering to the MySql 

Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-12-17 Thread Becket Qin
Hi JIabao,

Thanks for updating the FLIP. Maybe I did not explain it clearly enough. My
point is that given there are various good flavors of behaviors handling
filters pushed down, we should not have a common config of
"ignore.filter.pushdown", because the behavior is not *common*.

It looks like the original motivation of this FLIP is just for MySql. Let's
focus on what is the best solution for MySql connector here first. After
that, if people think the best behavior for MySql happens to be a common
one, we can then discuss whether that is worth being added to the base
implementation of source. For MySQL, if we are going to introduce a config
to MySql, why not have something like "mysql.filter.handling.policy" with
value of AUTO / NEVER / ALWAYS? Isn't that better than
"ignore.filter.pushdown"?

Thanks,

Jiangjie (Becket) Qin



On Sun, Dec 17, 2023 at 11:30 PM Jiabao Sun 
wrote:

> Hi Becket,
>
> The FLIP document has been updated as well.
> Please take a look when you have time.
>
> Thanks,
> Jiabao
>
>
> > 2023年12月17日 22:54,Jiabao Sun  写道:
> >
> > Thanks Becket,
> >
> > I apologize for not being able to continue with this proposal due to
> being too busy during this period.
> >
> > The viewpoints you shared about the design of Flink Source make sense to
> me
> > The native configuration ‘ignore.filter.pushdown’ is good to me.
> > Having a unified name or naming style can indeed prevent confusion for
> users regarding
> > the inconsistent naming of this configuration across different
> connectors.
> >
> > Currently, there are not many external connectors that support filter
> pushdown.
> > I propose that we first introduce it in flink-connector-jdbc and
> flink-connector-mongodb.
> > Do you think this is feasible?
> >
> > Best,
> > Jiabao
> >
> >
> >> 2023年11月16日 17:45,Becket Qin  写道:
> >>
> >> Hi Jiabao,
> >>
> >> Arguments like "because Spark has it so Flink should also have it" does
> not
> >> make sense. Different projects have different API flavors and styles.
> What
> >> is really important is the rationale and the design principle behind the
> >> API. They should conform to the convention of the project.
> >>
> >> First of all, Spark Source API itself has a few issues and they ended up
> >> introduce DataSource V2 in Spark 3.0, which added the decorative
> interfaces
> >> like SupportsPushdownXXX. Some of the configurations predating
> DataSource
> >> V2 may still be there.
> >>
> >> For the Spark configurations you mentioned, they are all the
> configurations
> >> for FileScanBuilder, which is equivalent to FileSource in Flink.
> Currently,
> >> regardless of the format (ORC, Parquet, Avro, etc), the FileSource
> pushes
> >> back all the filters to ensure correctness. The actual filters that got
> >> applied to the specific format might still be different. This
> >> implementation is the same in FileScanBuilder.pushFilters() for Spark. I
> >> don't know why Spark got separate configurations for each format. Maybe
> it
> >> is because the filters are actually implemented differently for
> different
> >> format.
> >>
> >> At least for the current implementation in FileScanBuilder, these
> >> configurations can be merged to one configuration like
> >> `apply.filters.to.format.enabled`. Note that this config, as well as the
> >> separate configs you mentioned, are just visible and used by the
> >> FileScanBuilder. It determines whether the filters should be passed
> down to
> >> the format of the FileScanBuilder instance. Regardless of the value of
> >> these configs, FileScanBuilder.pushFilters() will always be called, and
> >> FileScanBuilder (as well as FileSource in Flink) will always push back
> all
> >> the filters to the framework.
> >>
> >> A MySql source can have a very different way to handle this. For
> example, A
> >> MySql source  A config in this case might be "my.apply.filters" with
> three
> >> different values:
> >> - AUTO: The Source will issue a DESC Table request to understand
> whether a
> >> filter can be applied efficiently. And decide which filters can be
> applied
> >> and which cannot based on that.
> >> - NEVER: Never apply filtering. It will always do a full table read and
> >> let Flink do the filtering.
> >> - ALWAYS: Always apply the filtering to the MySql server.
> >>
> >> In the above examples of FileSource and MySql Source, I don't think it
> is a
> >> good idea to shoehorn the behaviors into a naive config of
> >> `ignore.filter.pushdown`. That is why I don't think this is a common
> config.
> >>
> >> To recap, like I said, I do agree that in some cases, we may want to
> behave
> >> differently when filters are pushed down to the sources, even if a
> source
> >> implements SupportsFilterPushDown, but I don't think there is a suitable
> >> common config for this. The behavior is very likely source specific.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Thu, Nov 16, 2023 at 3:41 PM Jiabao Sun  .invalid>
> >> wrote:
> >>
> >>> Thanks 

Re: [VOTE] FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs

2023-12-17 Thread Xintong Song
+1 (binding)

Best,

Xintong



On Mon, Dec 18, 2023 at 3:05 PM Lijie Wang  wrote:

> +1 (binding)
>
> Best,
> Lijie
>
> Yuxin Tan  于2023年12月15日周五 17:14写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Yuxin
> >
> >
> > weijie guo  于2023年12月15日周五 10:05写道:
> >
> > > +1(binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Wencong Liu  于2023年12月15日周五 09:13写道:
> > >
> > > > Hi dev,
> > > >
> > > > I'd like to start a vote on FLIP-382.
> > > >
> > > > Discussion thread:
> > > > https://lists.apache.org/thread/3mgsc31odtpmzzl32s4oqbhlhxd0mn6b
> > > > FLIP:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs
> > > >
> > > > Best regards,
> > > > Wencong Liu
> > >
> >
>


Re: [VOTE] FLIP-380: Support Full Partition Processing On Non-keyed DataStream

2023-12-17 Thread Xintong Song
+1 (binding)


Best,

Xintong



On Fri, Dec 15, 2023 at 5:15 PM weijie guo 
wrote:

> +1 (binding)
>
> Best regards,
>
> Weijie
>
>
> Wencong Liu  于2023年12月15日周五 09:15写道:
>
> > Hi dev,
> >
> > I'd like to start a vote on FLIP-380.
> >
> > Discussion thread:
> > https://lists.apache.org/thread/nn7myj7vsvytbkdrnbvj5h0homsjrn1h
> > FLIP:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream
> >
> > Best regards,
> > Wencong Liu
>


Re: [VOTE] FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs

2023-12-17 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Yuxin Tan  于2023年12月15日周五 17:14写道:

> +1 (non-binding)
>
> Best,
> Yuxin
>
>
> weijie guo  于2023年12月15日周五 10:05写道:
>
> > +1(binding)
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Wencong Liu  于2023年12月15日周五 09:13写道:
> >
> > > Hi dev,
> > >
> > > I'd like to start a vote on FLIP-382.
> > >
> > > Discussion thread:
> > > https://lists.apache.org/thread/3mgsc31odtpmzzl32s4oqbhlhxd0mn6b
> > > FLIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs
> > >
> > > Best regards,
> > > Wencong Liu
> >
>


[jira] [Created] (FLINK-33870) Split the HighAvailabilityServices into LeaderServices and PersistentServices

2023-12-17 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-33870:
--

 Summary: Split the HighAvailabilityServices into LeaderServices 
and PersistentServices
 Key: FLINK-33870
 URL: https://issues.apache.org/jira/browse/FLINK-33870
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Yangze Guo
Assignee: Yangze Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re:Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-17 Thread Xuyang
Hi, Alan and Timo. Thanks for your reply.
>Would it make a difference if it were exposed by the explain
>method (the operator having "syncMode" vs not)?
@Alan: I think this is a good way to tell the user what mode these async udx 
are currently in.
>A regular SQL user doesn't care whether the function is sync or async. 
@Timo: I agree that the planner should throw as few exceptions as possible 
rather than confusing users. So I think 
it is a good way to expose syncMode through explain syntax.
> If the input to the operator is append-only, it seems fine, 
> because this implies that each row is effectively independent and ordering is 
> unimportant.


> For example, if the query is > an append-only `SELECT FUNC(c) FROM t`, 
> I don't see a reason why the > operator is not allowed to produce unordered 
> results.


@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as 
source and mysql as sink as an example. 
Although kafka is an append-only source, one of its fields is used as pk when 
writing to mysql. If async udx is executed
 in an unordered mode, there may be problems with the data in mysql in the end. 
In this case, we need to ensure that 
the sink-based pk is in order actually.



--

Best!
Xuyang





At 2023-12-16 03:33:55, "Alan Sheinberg"  
wrote:
>Thanks for the replies everyone.  My responses are inline:
>
>About the configs, what do you think using hints as mentioned in [1].
>
>@Aitozi: I think hints could be a good way to do this, similar to lookup
>joins or the proposal in FLIP-313.  One benefit of hints is that they allow
>for the highest granularity of configuration because you can decide at
>each and every call site just what parameters to use.  The downside of
>hints is that there's more syntax to learn and more verbosity.  I'm
>somewhat partial to a configuration like this with a class definition level
>of granularity (similar to how metrics reporters are defined [1]):
>
>table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
>table.exec.async-scalar.myfunc.buffer-capacity: 10
>...
>
>As Timo mentioned, the downside to this is that there's not a nice static
>way to do this at the moment, unless you extend ConfigOption.  It would be
>good ultimately if Lookup joins, async scalar functions, and other future
>configurable UDFs shared the same methodology, but maybe a unified approach
>is a followup discussion.
>
>I’m just curious why you don’t use conf(global) and query hint(individual
>> async udx) to mark the output
>> mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
>> chose to introduce a new enum
>> in AsyncScalarFunction.
>
>
>@Xuyang: I'm open to adding hints. I think the important part is that we
>have some method for the user to have a class definition level way to
>define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
>have a strong sense yet for what would be most appropriately exposed as a
>FunctionRequirement vs a simple configuration/hint.
>
>What about throwing an exception to make it clear to users that using async
>> scalar functions in this situation
>> is problematic instead of executing silently in sync mode? Because users
>> may be confused about
>> the final actual job graph.
>
>
>@Xuyang: Would it make a difference if it were exposed by the explain
>method (the operator having "syncMode" vs not)?  I'd be fine to do it
>either way -- certainly throwing an error is a bit simpler.
>
>You are right. Actually it should be the planner that fully decides
>> whether ORDERED or UNORDERED is safe to do. For example, if the query is
>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
>> operator is not allowed to produce unordered results. By global
>> configuration, we can set ORDERED such that users don't get confused
>> about the unordered output.
>
>
>@Timo: Is there an easy way to determine if the output of an async function
>would be problematic or not?  If the input to the operator is append-only,
>it seems fine, because this implies that each row is effectively
>independent and ordering is unimportant. For upsert mode with +U rows, you
>wouldn't want to swap order with other +U rows for the same key because the
>last one should win.  For -D or -U rows, you wouldn't want to swap with
>other rows for the same key for similar reasons.  Is it as simple as
>looking for the changlelog mode to determine whether it's safe to run async
>functions UNORDERED?  I had considered analyzing various query forms (join
>vs aggregation vs whatever), but it seems like changelog mode could be
>sufficient to understand what works and what would be an issue.  Any code
>pointers and explanation for similar analysis would be great to understand
>this more.
>
>The mode UNORDERED however should only have
>> effect for these simply use cases and throw an exception if UNORDERED
>> would mess up a changelog or other subsequent operators.
>
>@Timo: Should we throw errors or run in sync 

Re: [DISCUSS] FLIP-398: Improve Serialization Configuration And Usage In Flink

2023-12-17 Thread Xintong Song
Hi Ken,

I think the main purpose of this FLIP is to change how users interact with
the knobs for customizing the serialization behaviors, from requiring code
changes to working with pure configurations. Redesigning the knobs (i.e.,
names, semantics, etc.), on the other hand, is not the purpose of this
FLIP. Preserving the existing names and semantics should also help minimize
the migration cost for existing users. Therefore, I'm in favor of not
changing them.

Concerning decoupling from Kryo, and introducing other serialization
frameworks like Fury, I think that's a bigger topic that is worth further
discussion. At the moment, I'm not aware of any community consensus on
doing so. And even if in the future we decide to do so, the changes needed
should be the same w/ or w/o this FLIP. So I'd suggest not to block this
FLIP on these issues.

WDYT?

Best,

Xintong



On Fri, Dec 15, 2023 at 1:40 AM Ken Krugler 
wrote:

> Hi Yong,
>
> Looks good, thanks for creating this.
>
> One comment - related to my recent email about Fury, I would love to see
> the v2 serialization decoupled from Kryo.
>
> As part of that, instead of using xxxKryo in methods, call them xxxGeneric.
>
> A more extreme change would be to totally rely on Fury (so no more POJO
> serializer). Fury is faster than the POJO serializer in my tests, but this
> would be a much bigger change.
>
> Though it could dramatically simplify the Flink serialization support.
>
> — Ken
>
> PS - a separate issue is how to migrate state from Kryo to something like
> Fury, which supports schema evolution. I think this might be possible, by
> having a smarter deserializer that identifies state as being created by
> Kryo, and using (shaded) Kryo to deserialize, while still writing as Fury.
>
> > On Dec 6, 2023, at 6:35 PM, Yong Fang  wrote:
> >
> > Hi devs,
> >
> > I'd like to start a discussion about FLIP-398: Improve Serialization
> > Configuration And Usage In Flink [1].
> >
> > Currently, users can register custom data types and serializers in Flink
> > jobs through various methods, including registration in code,
> > configuration, and annotations. These lead to difficulties in upgrading
> > Flink jobs and priority issues.
> >
> > In flink-2.0 we would like to manage job data types and serializers
> through
> > configurations. This FLIP will introduce a unified option for data type
> and
> > serializer and users can configure all custom data types and
> > pojo/kryo/custom serializers. In addition, this FLIP will add more
> built-in
> > serializers for complex data types such as List and Map, and optimize the
> > management of Avro Serializers.
> >
> > Looking forward to hearing from you, thanks!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-398%3A+Improve+Serialization+Configuration+And+Usage+In+Flink
> >
> > Best,
> > Fang Yong
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink & Pinot
>
>
>
>


[jira] [Created] (FLINK-33869) Add checkpoint metrics: the latency to close the file

2023-12-17 Thread Jufang He (Jira)
Jufang He created FLINK-33869:
-

 Summary: Add checkpoint metrics: the latency to close the file
 Key: FLINK-33869
 URL: https://issues.apache.org/jira/browse/FLINK-33869
 Project: Flink
  Issue Type: Sub-task
Reporter: Jufang He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33868) Add checkpoint metrics: the latency to write the file

2023-12-17 Thread Jufang He (Jira)
Jufang He created FLINK-33868:
-

 Summary: Add checkpoint metrics: the latency to write the file
 Key: FLINK-33868
 URL: https://issues.apache.org/jira/browse/FLINK-33868
 Project: Flink
  Issue Type: Sub-task
Reporter: Jufang He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33867) Add checkpoint metrics: the rate of file write

2023-12-17 Thread Jufang He (Jira)
Jufang He created FLINK-33867:
-

 Summary: Add checkpoint metrics: the rate of file write
 Key: FLINK-33867
 URL: https://issues.apache.org/jira/browse/FLINK-33867
 Project: Flink
  Issue Type: Sub-task
Reporter: Jufang He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


FLIP-403: High Availability Services for OLAP Scenarios

2023-12-17 Thread Yangze Guo
Hi, there,

We would like to start a discussion thread on "FLIP-403: High
Availability Services for OLAP Scenarios"[1].

Currently, Flink's high availability service consists of two
mechanisms: leader election/retrieval services for JobManager and
persistent services for job metadata. However, these mechanisms are
set up in an "all or nothing" manner. In OLAP scenarios, we typically
only require leader election/retrieval services for JobManager
components since jobs usually do not have a restart strategy.
Additionally, the persistence of job states can negatively impact the
cluster's throughput, especially for short query jobs.

To address these issues, this FLIP proposes splitting the
HighAvailabilityServices into LeaderServices and PersistentServices,
and enable users to independently configure the high availability
strategies specifically related to jobs.

Please find more details in the FLIP wiki document [1]. Looking
forward to your feedback.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios

Best,
Yangze Guo


Re: Question on lookup joins

2023-12-17 Thread Hang Ruan
Hi, David.

The FLIP-377[1] is about this part. You could take a look at it.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768


Hang Ruan  于2023年12月17日周日 20:56写道:

> Hi, David.
>
> I think you are right that the value with NULL should not be returned if
> the filter push down is closed.
>
> Maybe you should explain this sql to make sure this filter not be pushed
> down to the lookup source.
>
> I see the configuration
> 'table.optimizer.source.predicate-pushdown-enabled' relies on the class
> FilterableTableSource, which is deprecated.
> I am not sure whether this configuration is still useful for jdbc
> connector, which is using the SupportsFilterPushDown.
>
> Maybe the jdbc connector should read this configuration and return an
> empty 'acceptedFilters' in the method 'applyFilters'.
>
> Best,
> Hang
>
> David Radley  于2023年12月16日周六 01:47写道:
>
>> Hi ,
>> I am working on FLINK-33365 which related to JDBC predicate pushdown. I
>> want to ensure that the same results occur with predicate pushdown as
>> without. So I am asking this question outside the pr / issue.
>>
>> I notice the following behaviour for lookup joins without predicate
>> pushdown. I was not expecting all the s , when there is not a
>> matching join key.  ’a’ is a table in paimon and ‘db’ is a relational
>> database.
>>
>>
>>
>> Flink SQL> select * from a;
>>
>> +++-+
>>
>> | op | ip |proctime |
>>
>> +++-+
>>
>> | +I |10.10.10.10 | 2023-12-15 17:36:10.028 |
>>
>> | +I |20.20.20.20 | 2023-12-15 17:36:10.030 |
>>
>> | +I |30.30.30.30 | 2023-12-15 17:36:10.031 |
>>
>> ^CQuery terminated, received a total of 3 rows
>>
>>
>>
>> Flink SQL> select * from  db_catalog.menagerie.e;
>>
>>
>> +++-+-+-+-+
>>
>> | op | ip |type | age |
>> height |  weight |
>>
>>
>> +++-+-+-+-+
>>
>> | +I |10.10.10.10 |   1 |  30 |
>>100 | 100 |
>>
>> | +I |10.10.10.10 |   2 |  40 |
>> 90 | 110 |
>>
>> | +I |10.10.10.10 |   2 |  50 |
>> 80 | 120 |
>>
>> | +I |10.10.10.10 |   3 |  50 |
>> 70 |  40 |
>>
>> | +I |20.20.20.20 |   3 |  30 |
>> 80 |  90 |
>>
>>
>> +++-+-+-+-+
>>
>> Received a total of 5 rows
>>
>>
>>
>> Flink SQL> set table.optimizer.source.predicate-pushdown-enabled=false;
>>
>> [INFO] Execute statement succeed.
>>
>>
>>
>> Flink SQL> SELECT * FROM a left join mariadb_catalog.menagerie.e FOR
>> SYSTEM_TIME AS OF a.proctime on e.type = 2 and a.ip = e.ip;
>>
>>
>> +++-++-+-+-+-+
>>
>> | op | ip |proctime |
>> ip0 |type | age |  height |
>> weight |
>>
>>
>> +++-++-+-+-+-+
>>
>> | +I |10.10.10.10 | 2023-12-15 17:38:05.169 |
>> 10.10.10.10 |   2 |  40 |  90 |
>>  110 |
>>
>> | +I |10.10.10.10 | 2023-12-15 17:38:05.169 |
>> 10.10.10.10 |   2 |  50 |  80 |
>>  120 |
>>
>> | +I |20.20.20.20 | 2023-12-15 17:38:05.170 |
>>   |   |   |   |
>>  |
>>
>> | +I |30.30.30.30 | 2023-12-15 17:38:05.172 |
>>   |   |   |   |
>>  |
>>
>> Unless otherwise stated above:
>>
>> IBM United Kingdom Limited
>> Registered in England and Wales with number 741598
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>>
>


[jira] [Created] (FLINK-33866) KafkaSinkBuilder in flink-connector-kafka references DeliveryGuarantee in flink-connector-base

2023-12-17 Thread Kurt Ostfeld (Jira)
Kurt Ostfeld created FLINK-33866:


 Summary: KafkaSinkBuilder in flink-connector-kafka references 
DeliveryGuarantee in flink-connector-base
 Key: FLINK-33866
 URL: https://issues.apache.org/jira/browse/FLINK-33866
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-3.0.2
Reporter: Kurt Ostfeld


I have a Flink project that has code like:

```
KafkaSink.builder().setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
```
 
This worked with flink-connector-kafka 3.0.1 as well as past versions of Flink.
 
This fails to compile with flink-connector-kafka 3.0.2 because that release 
changed flink-connector-base to a provided dependency so the reference to the 
DeliveryGuarantee class becomes a compiler error.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-12-17 Thread Jiabao Sun
Hi Becket,

The FLIP document has been updated as well.
Please take a look when you have time.

Thanks,
Jiabao


> 2023年12月17日 22:54,Jiabao Sun  写道:
> 
> Thanks Becket,
> 
> I apologize for not being able to continue with this proposal due to being 
> too busy during this period.
> 
> The viewpoints you shared about the design of Flink Source make sense to me 
> The native configuration ‘ignore.filter.pushdown’ is good to me. 
> Having a unified name or naming style can indeed prevent confusion for users 
> regarding
> the inconsistent naming of this configuration across different connectors.
> 
> Currently, there are not many external connectors that support filter 
> pushdown.
> I propose that we first introduce it in flink-connector-jdbc and 
> flink-connector-mongodb. 
> Do you think this is feasible?
> 
> Best,
> Jiabao
> 
> 
>> 2023年11月16日 17:45,Becket Qin  写道:
>> 
>> Hi Jiabao,
>> 
>> Arguments like "because Spark has it so Flink should also have it" does not
>> make sense. Different projects have different API flavors and styles. What
>> is really important is the rationale and the design principle behind the
>> API. They should conform to the convention of the project.
>> 
>> First of all, Spark Source API itself has a few issues and they ended up
>> introduce DataSource V2 in Spark 3.0, which added the decorative interfaces
>> like SupportsPushdownXXX. Some of the configurations predating DataSource
>> V2 may still be there.
>> 
>> For the Spark configurations you mentioned, they are all the configurations
>> for FileScanBuilder, which is equivalent to FileSource in Flink. Currently,
>> regardless of the format (ORC, Parquet, Avro, etc), the FileSource pushes
>> back all the filters to ensure correctness. The actual filters that got
>> applied to the specific format might still be different. This
>> implementation is the same in FileScanBuilder.pushFilters() for Spark. I
>> don't know why Spark got separate configurations for each format. Maybe it
>> is because the filters are actually implemented differently for different
>> format.
>> 
>> At least for the current implementation in FileScanBuilder, these
>> configurations can be merged to one configuration like
>> `apply.filters.to.format.enabled`. Note that this config, as well as the
>> separate configs you mentioned, are just visible and used by the
>> FileScanBuilder. It determines whether the filters should be passed down to
>> the format of the FileScanBuilder instance. Regardless of the value of
>> these configs, FileScanBuilder.pushFilters() will always be called, and
>> FileScanBuilder (as well as FileSource in Flink) will always push back all
>> the filters to the framework.
>> 
>> A MySql source can have a very different way to handle this. For example, A
>> MySql source  A config in this case might be "my.apply.filters" with three
>> different values:
>> - AUTO: The Source will issue a DESC Table request to understand whether a
>> filter can be applied efficiently. And decide which filters can be applied
>> and which cannot based on that.
>> - NEVER: Never apply filtering. It will always do a full table read and
>> let Flink do the filtering.
>> - ALWAYS: Always apply the filtering to the MySql server.
>> 
>> In the above examples of FileSource and MySql Source, I don't think it is a
>> good idea to shoehorn the behaviors into a naive config of
>> `ignore.filter.pushdown`. That is why I don't think this is a common config.
>> 
>> To recap, like I said, I do agree that in some cases, we may want to behave
>> differently when filters are pushed down to the sources, even if a source
>> implements SupportsFilterPushDown, but I don't think there is a suitable
>> common config for this. The behavior is very likely source specific.
>> 
>> Thanks,
>> 
>> Jiangjie (Becket) Qin
>> 
>> 
>> 
>> On Thu, Nov 16, 2023 at 3:41 PM Jiabao Sun 
>> wrote:
>> 
>>> Thanks Becket,
>>> 
>>> I still believe that adding a configuration at the source level to disable
>>> filter pushdown is needed. This demand exists in spark as well[1].
>>> 
>>> In Spark, most sources that support filter pushdown provide their own
>>> corresponding configuration options to enable or disable filter pushdown.
>>> For PRs[2-4] that support filter pushdown capability, they also provide
>>> configuration options to disable this capability.
>>> 
>>> I believe this configuration is applicable to most scenarios, and there is
>>> no need to dwell on why this configuration option was not introduced
>>> earlier than the SupportsFilterPushDown interface.
>>> 
>>> spark.sql.parquet.filterPushdown
>>> spark.sql.orc.filterPushdown
>>> spark.sql.csv.filterPushdown.enabled
>>> spark.sql.json.filterPushdown.enabled
>>> spark.sql.avro.filterPushdown.enabled
>>> JDBC Option: pushDownPredicate
>>> 
>>> We can see that the lack of consistency is caused by each connector
>>> introducing different configuration options for the same behavior.
>>> This is one of the motivations for advocating the 

Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-12-17 Thread Jiabao Sun
Thanks Becket,

I apologize for not being able to continue with this proposal due to being too 
busy during this period.

The viewpoints you shared about the design of Flink Source make sense to me 
The native configuration ‘ignore.filter.pushdown’ is good to me. 
Having a unified name or naming style can indeed prevent confusion for users 
regarding
the inconsistent naming of this configuration across different connectors.

Currently, there are not many external connectors that support filter pushdown.
I propose that we first introduce it in flink-connector-jdbc and 
flink-connector-mongodb. 
Do you think this is feasible?

Best,
Jiabao


> 2023年11月16日 17:45,Becket Qin  写道:
> 
> Hi Jiabao,
> 
> Arguments like "because Spark has it so Flink should also have it" does not
> make sense. Different projects have different API flavors and styles. What
> is really important is the rationale and the design principle behind the
> API. They should conform to the convention of the project.
> 
> First of all, Spark Source API itself has a few issues and they ended up
> introduce DataSource V2 in Spark 3.0, which added the decorative interfaces
> like SupportsPushdownXXX. Some of the configurations predating DataSource
> V2 may still be there.
> 
> For the Spark configurations you mentioned, they are all the configurations
> for FileScanBuilder, which is equivalent to FileSource in Flink. Currently,
> regardless of the format (ORC, Parquet, Avro, etc), the FileSource pushes
> back all the filters to ensure correctness. The actual filters that got
> applied to the specific format might still be different. This
> implementation is the same in FileScanBuilder.pushFilters() for Spark. I
> don't know why Spark got separate configurations for each format. Maybe it
> is because the filters are actually implemented differently for different
> format.
> 
> At least for the current implementation in FileScanBuilder, these
> configurations can be merged to one configuration like
> `apply.filters.to.format.enabled`. Note that this config, as well as the
> separate configs you mentioned, are just visible and used by the
> FileScanBuilder. It determines whether the filters should be passed down to
> the format of the FileScanBuilder instance. Regardless of the value of
> these configs, FileScanBuilder.pushFilters() will always be called, and
> FileScanBuilder (as well as FileSource in Flink) will always push back all
> the filters to the framework.
> 
> A MySql source can have a very different way to handle this. For example, A
> MySql source  A config in this case might be "my.apply.filters" with three
> different values:
> - AUTO: The Source will issue a DESC Table request to understand whether a
> filter can be applied efficiently. And decide which filters can be applied
> and which cannot based on that.
> - NEVER: Never apply filtering. It will always do a full table read and
> let Flink do the filtering.
> - ALWAYS: Always apply the filtering to the MySql server.
> 
> In the above examples of FileSource and MySql Source, I don't think it is a
> good idea to shoehorn the behaviors into a naive config of
> `ignore.filter.pushdown`. That is why I don't think this is a common config.
> 
> To recap, like I said, I do agree that in some cases, we may want to behave
> differently when filters are pushed down to the sources, even if a source
> implements SupportsFilterPushDown, but I don't think there is a suitable
> common config for this. The behavior is very likely source specific.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> On Thu, Nov 16, 2023 at 3:41 PM Jiabao Sun 
> wrote:
> 
>> Thanks Becket,
>> 
>> I still believe that adding a configuration at the source level to disable
>> filter pushdown is needed. This demand exists in spark as well[1].
>> 
>> In Spark, most sources that support filter pushdown provide their own
>> corresponding configuration options to enable or disable filter pushdown.
>> For PRs[2-4] that support filter pushdown capability, they also provide
>> configuration options to disable this capability.
>> 
>> I believe this configuration is applicable to most scenarios, and there is
>> no need to dwell on why this configuration option was not introduced
>> earlier than the SupportsFilterPushDown interface.
>> 
>> spark.sql.parquet.filterPushdown
>> spark.sql.orc.filterPushdown
>> spark.sql.csv.filterPushdown.enabled
>> spark.sql.json.filterPushdown.enabled
>> spark.sql.avro.filterPushdown.enabled
>> JDBC Option: pushDownPredicate
>> 
>> We can see that the lack of consistency is caused by each connector
>> introducing different configuration options for the same behavior.
>> This is one of the motivations for advocating the introduction of a
>> unified configuration name.
>> 
>> [1] https://issues.apache.org/jira/browse/SPARK-24288
>> [2] https://github.com/apache/spark/pull/27366
>> [3]https://github.com/apache/spark/pull/26973
>> [4] https://github.com/apache/spark/pull/29145
>> 
>> Best,
>> Jiabao
>> 

Re: Question on lookup joins

2023-12-17 Thread Hang Ruan
Hi, David.

I think you are right that the value with NULL should not be returned if
the filter push down is closed.

Maybe you should explain this sql to make sure this filter not be pushed
down to the lookup source.

I see the configuration 'table.optimizer.source.predicate-pushdown-enabled'
relies on the class FilterableTableSource, which is deprecated.
I am not sure whether this configuration is still useful for jdbc
connector, which is using the SupportsFilterPushDown.

Maybe the jdbc connector should read this configuration and return an
empty 'acceptedFilters' in the method 'applyFilters'.

Best,
Hang

David Radley  于2023年12月16日周六 01:47写道:

> Hi ,
> I am working on FLINK-33365 which related to JDBC predicate pushdown. I
> want to ensure that the same results occur with predicate pushdown as
> without. So I am asking this question outside the pr / issue.
>
> I notice the following behaviour for lookup joins without predicate
> pushdown. I was not expecting all the s , when there is not a
> matching join key.  ’a’ is a table in paimon and ‘db’ is a relational
> database.
>
>
>
> Flink SQL> select * from a;
>
> +++-+
>
> | op | ip |proctime |
>
> +++-+
>
> | +I |10.10.10.10 | 2023-12-15 17:36:10.028 |
>
> | +I |20.20.20.20 | 2023-12-15 17:36:10.030 |
>
> | +I |30.30.30.30 | 2023-12-15 17:36:10.031 |
>
> ^CQuery terminated, received a total of 3 rows
>
>
>
> Flink SQL> select * from  db_catalog.menagerie.e;
>
>
> +++-+-+-+-+
>
> | op | ip |type | age |
> height |  weight |
>
>
> +++-+-+-+-+
>
> | +I |10.10.10.10 |   1 |  30 |
>  100 | 100 |
>
> | +I |10.10.10.10 |   2 |  40 |
>   90 | 110 |
>
> | +I |10.10.10.10 |   2 |  50 |
>   80 | 120 |
>
> | +I |10.10.10.10 |   3 |  50 |
>   70 |  40 |
>
> | +I |20.20.20.20 |   3 |  30 |
>   80 |  90 |
>
>
> +++-+-+-+-+
>
> Received a total of 5 rows
>
>
>
> Flink SQL> set table.optimizer.source.predicate-pushdown-enabled=false;
>
> [INFO] Execute statement succeed.
>
>
>
> Flink SQL> SELECT * FROM a left join mariadb_catalog.menagerie.e FOR
> SYSTEM_TIME AS OF a.proctime on e.type = 2 and a.ip = e.ip;
>
>
> +++-++-+-+-+-+
>
> | op | ip |proctime |
>   ip0 |type | age |  height |
> weight |
>
>
> +++-++-+-+-+-+
>
> | +I |10.10.10.10 | 2023-12-15 17:38:05.169 |
>   10.10.10.10 |   2 |  40 |  90 |
>  110 |
>
> | +I |10.10.10.10 | 2023-12-15 17:38:05.169 |
>   10.10.10.10 |   2 |  50 |  80 |
>  120 |
>
> | +I |20.20.20.20 | 2023-12-15 17:38:05.170 |
> |   |   |   |
>  |
>
> | +I |30.30.30.30 | 2023-12-15 17:38:05.172 |
> |   |   |   |
>  |
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


[jira] [Created] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration

2023-12-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33865:
---

 Summary: exponential-delay.attempts-before-reset-backoff doesn't 
work when it's set in Job Configuration
 Key: FLINK-33865
 URL: https://issues.apache.org/jira/browse/FLINK-33865
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-12-17-17-56-59-138.png

exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
Job Configuration.

Reason: when exponential-delay.attempts-before-reset-backoff is set by job 
Configuration instead of cluster configuration.

ExecutionConfig#configure will call RestartStrategies#parseConfiguration to 
create the RestartStrategyConfiguration. And then 
RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will 
create the ExponentialDelayRestartBackoffTimeStrategyFactory by the 
RestartStrategyConfiguration.

Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, 
so it doesn't support exponential-delay.attempts-before-reset-backoff.

I have a misunderstand during FLINK-32895, I thought the 
RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory 
will create ExponentialDelayRestartBackoffTimeStrategyFactory by the 
clusterConfiguration.


 !image-2023-12-17-17-56-59-138.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)