Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources
Hi Becket, The MySQL connector is currently in the flink-connector-jdbc repository and is not a standalone connector. Is it too unique to use "mysql" as the configuration option prefix? Also, I would like to ask about the difference in behavior between AUTO and ALWAYS. It seems that we cannot guarantee the pushing down of all filters to the external system under the ALWAYS mode because not all filters in Flink SQL are supported by the external system. Should we throw an error when encountering a filter that cannot be pushed down in the ALWAYS mode? Thanks, Jiabao > 2023年12月18日 15:34,Becket Qin 写道: > > Hi JIabao, > > Thanks for updating the FLIP. Maybe I did not explain it clearly enough. My > point is that given there are various good flavors of behaviors handling > filters pushed down, we should not have a common config of > "ignore.filter.pushdown", because the behavior is not *common*. > > It looks like the original motivation of this FLIP is just for MySql. Let's > focus on what is the best solution for MySql connector here first. After > that, if people think the best behavior for MySql happens to be a common > one, we can then discuss whether that is worth being added to the base > implementation of source. For MySQL, if we are going to introduce a config > to MySql, why not have something like "mysql.filter.handling.policy" with > value of AUTO / NEVER / ALWAYS? Isn't that better than > "ignore.filter.pushdown"? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Sun, Dec 17, 2023 at 11:30 PM Jiabao Sun > wrote: > >> Hi Becket, >> >> The FLIP document has been updated as well. >> Please take a look when you have time. >> >> Thanks, >> Jiabao >> >> >>> 2023年12月17日 22:54,Jiabao Sun 写道: >>> >>> Thanks Becket, >>> >>> I apologize for not being able to continue with this proposal due to >> being too busy during this period. >>> >>> The viewpoints you shared about the design of Flink Source make sense to >> me >>> The native configuration ‘ignore.filter.pushdown’ is good to me. >>> Having a unified name or naming style can indeed prevent confusion for >> users regarding >>> the inconsistent naming of this configuration across different >> connectors. >>> >>> Currently, there are not many external connectors that support filter >> pushdown. >>> I propose that we first introduce it in flink-connector-jdbc and >> flink-connector-mongodb. >>> Do you think this is feasible? >>> >>> Best, >>> Jiabao >>> >>> 2023年11月16日 17:45,Becket Qin 写道: Hi Jiabao, Arguments like "because Spark has it so Flink should also have it" does >> not make sense. Different projects have different API flavors and styles. >> What is really important is the rationale and the design principle behind the API. They should conform to the convention of the project. First of all, Spark Source API itself has a few issues and they ended up introduce DataSource V2 in Spark 3.0, which added the decorative >> interfaces like SupportsPushdownXXX. Some of the configurations predating >> DataSource V2 may still be there. For the Spark configurations you mentioned, they are all the >> configurations for FileScanBuilder, which is equivalent to FileSource in Flink. >> Currently, regardless of the format (ORC, Parquet, Avro, etc), the FileSource >> pushes back all the filters to ensure correctness. The actual filters that got applied to the specific format might still be different. This implementation is the same in FileScanBuilder.pushFilters() for Spark. I don't know why Spark got separate configurations for each format. Maybe >> it is because the filters are actually implemented differently for >> different format. At least for the current implementation in FileScanBuilder, these configurations can be merged to one configuration like `apply.filters.to.format.enabled`. Note that this config, as well as the separate configs you mentioned, are just visible and used by the FileScanBuilder. It determines whether the filters should be passed >> down to the format of the FileScanBuilder instance. Regardless of the value of these configs, FileScanBuilder.pushFilters() will always be called, and FileScanBuilder (as well as FileSource in Flink) will always push back >> all the filters to the framework. A MySql source can have a very different way to handle this. For >> example, A MySql source A config in this case might be "my.apply.filters" with >> three different values: - AUTO: The Source will issue a DESC Table request to understand >> whether a filter can be applied efficiently. And decide which filters can be >> applied and which cannot based on that. - NEVER: Never apply filtering. It will always do a full table read and let Flink do the filtering. - ALWAYS: Always apply the filtering to the MySql
Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources
Hi JIabao, Thanks for updating the FLIP. Maybe I did not explain it clearly enough. My point is that given there are various good flavors of behaviors handling filters pushed down, we should not have a common config of "ignore.filter.pushdown", because the behavior is not *common*. It looks like the original motivation of this FLIP is just for MySql. Let's focus on what is the best solution for MySql connector here first. After that, if people think the best behavior for MySql happens to be a common one, we can then discuss whether that is worth being added to the base implementation of source. For MySQL, if we are going to introduce a config to MySql, why not have something like "mysql.filter.handling.policy" with value of AUTO / NEVER / ALWAYS? Isn't that better than "ignore.filter.pushdown"? Thanks, Jiangjie (Becket) Qin On Sun, Dec 17, 2023 at 11:30 PM Jiabao Sun wrote: > Hi Becket, > > The FLIP document has been updated as well. > Please take a look when you have time. > > Thanks, > Jiabao > > > > 2023年12月17日 22:54,Jiabao Sun 写道: > > > > Thanks Becket, > > > > I apologize for not being able to continue with this proposal due to > being too busy during this period. > > > > The viewpoints you shared about the design of Flink Source make sense to > me > > The native configuration ‘ignore.filter.pushdown’ is good to me. > > Having a unified name or naming style can indeed prevent confusion for > users regarding > > the inconsistent naming of this configuration across different > connectors. > > > > Currently, there are not many external connectors that support filter > pushdown. > > I propose that we first introduce it in flink-connector-jdbc and > flink-connector-mongodb. > > Do you think this is feasible? > > > > Best, > > Jiabao > > > > > >> 2023年11月16日 17:45,Becket Qin 写道: > >> > >> Hi Jiabao, > >> > >> Arguments like "because Spark has it so Flink should also have it" does > not > >> make sense. Different projects have different API flavors and styles. > What > >> is really important is the rationale and the design principle behind the > >> API. They should conform to the convention of the project. > >> > >> First of all, Spark Source API itself has a few issues and they ended up > >> introduce DataSource V2 in Spark 3.0, which added the decorative > interfaces > >> like SupportsPushdownXXX. Some of the configurations predating > DataSource > >> V2 may still be there. > >> > >> For the Spark configurations you mentioned, they are all the > configurations > >> for FileScanBuilder, which is equivalent to FileSource in Flink. > Currently, > >> regardless of the format (ORC, Parquet, Avro, etc), the FileSource > pushes > >> back all the filters to ensure correctness. The actual filters that got > >> applied to the specific format might still be different. This > >> implementation is the same in FileScanBuilder.pushFilters() for Spark. I > >> don't know why Spark got separate configurations for each format. Maybe > it > >> is because the filters are actually implemented differently for > different > >> format. > >> > >> At least for the current implementation in FileScanBuilder, these > >> configurations can be merged to one configuration like > >> `apply.filters.to.format.enabled`. Note that this config, as well as the > >> separate configs you mentioned, are just visible and used by the > >> FileScanBuilder. It determines whether the filters should be passed > down to > >> the format of the FileScanBuilder instance. Regardless of the value of > >> these configs, FileScanBuilder.pushFilters() will always be called, and > >> FileScanBuilder (as well as FileSource in Flink) will always push back > all > >> the filters to the framework. > >> > >> A MySql source can have a very different way to handle this. For > example, A > >> MySql source A config in this case might be "my.apply.filters" with > three > >> different values: > >> - AUTO: The Source will issue a DESC Table request to understand > whether a > >> filter can be applied efficiently. And decide which filters can be > applied > >> and which cannot based on that. > >> - NEVER: Never apply filtering. It will always do a full table read and > >> let Flink do the filtering. > >> - ALWAYS: Always apply the filtering to the MySql server. > >> > >> In the above examples of FileSource and MySql Source, I don't think it > is a > >> good idea to shoehorn the behaviors into a naive config of > >> `ignore.filter.pushdown`. That is why I don't think this is a common > config. > >> > >> To recap, like I said, I do agree that in some cases, we may want to > behave > >> differently when filters are pushed down to the sources, even if a > source > >> implements SupportsFilterPushDown, but I don't think there is a suitable > >> common config for this. The behavior is very likely source specific. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> > >> On Thu, Nov 16, 2023 at 3:41 PM Jiabao Sun .invalid> > >> wrote: > >> > >>> Thanks
Re: [VOTE] FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs
+1 (binding) Best, Xintong On Mon, Dec 18, 2023 at 3:05 PM Lijie Wang wrote: > +1 (binding) > > Best, > Lijie > > Yuxin Tan 于2023年12月15日周五 17:14写道: > > > +1 (non-binding) > > > > Best, > > Yuxin > > > > > > weijie guo 于2023年12月15日周五 10:05写道: > > > > > +1(binding) > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > Wencong Liu 于2023年12月15日周五 09:13写道: > > > > > > > Hi dev, > > > > > > > > I'd like to start a vote on FLIP-382. > > > > > > > > Discussion thread: > > > > https://lists.apache.org/thread/3mgsc31odtpmzzl32s4oqbhlhxd0mn6b > > > > FLIP: > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs > > > > > > > > Best regards, > > > > Wencong Liu > > > > > >
Re: [VOTE] FLIP-380: Support Full Partition Processing On Non-keyed DataStream
+1 (binding) Best, Xintong On Fri, Dec 15, 2023 at 5:15 PM weijie guo wrote: > +1 (binding) > > Best regards, > > Weijie > > > Wencong Liu 于2023年12月15日周五 09:15写道: > > > Hi dev, > > > > I'd like to start a vote on FLIP-380. > > > > Discussion thread: > > https://lists.apache.org/thread/nn7myj7vsvytbkdrnbvj5h0homsjrn1h > > FLIP: > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream > > > > Best regards, > > Wencong Liu >
Re: [VOTE] FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs
+1 (binding) Best, Lijie Yuxin Tan 于2023年12月15日周五 17:14写道: > +1 (non-binding) > > Best, > Yuxin > > > weijie guo 于2023年12月15日周五 10:05写道: > > > +1(binding) > > > > Best regards, > > > > Weijie > > > > > > Wencong Liu 于2023年12月15日周五 09:13写道: > > > > > Hi dev, > > > > > > I'd like to start a vote on FLIP-382. > > > > > > Discussion thread: > > > https://lists.apache.org/thread/3mgsc31odtpmzzl32s4oqbhlhxd0mn6b > > > FLIP: > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs > > > > > > Best regards, > > > Wencong Liu > > >
[jira] [Created] (FLINK-33870) Split the HighAvailabilityServices into LeaderServices and PersistentServices
Yangze Guo created FLINK-33870: -- Summary: Split the HighAvailabilityServices into LeaderServices and PersistentServices Key: FLINK-33870 URL: https://issues.apache.org/jira/browse/FLINK-33870 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Yangze Guo Assignee: Yangze Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re:Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support
Hi, Alan and Timo. Thanks for your reply. >Would it make a difference if it were exposed by the explain >method (the operator having "syncMode" vs not)? @Alan: I think this is a good way to tell the user what mode these async udx are currently in. >A regular SQL user doesn't care whether the function is sync or async. @Timo: I agree that the planner should throw as few exceptions as possible rather than confusing users. So I think it is a good way to expose syncMode through explain syntax. > If the input to the operator is append-only, it seems fine, > because this implies that each row is effectively independent and ordering is > unimportant. > For example, if the query is > an append-only `SELECT FUNC(c) FROM t`, > I don't see a reason why the > operator is not allowed to produce unordered > results. @Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as source and mysql as sink as an example. Although kafka is an append-only source, one of its fields is used as pk when writing to mysql. If async udx is executed in an unordered mode, there may be problems with the data in mysql in the end. In this case, we need to ensure that the sink-based pk is in order actually. -- Best! Xuyang At 2023-12-16 03:33:55, "Alan Sheinberg" wrote: >Thanks for the replies everyone. My responses are inline: > >About the configs, what do you think using hints as mentioned in [1]. > >@Aitozi: I think hints could be a good way to do this, similar to lookup >joins or the proposal in FLIP-313. One benefit of hints is that they allow >for the highest granularity of configuration because you can decide at >each and every call site just what parameters to use. The downside of >hints is that there's more syntax to learn and more verbosity. I'm >somewhat partial to a configuration like this with a class definition level >of granularity (similar to how metrics reporters are defined [1]): > >table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction >table.exec.async-scalar.myfunc.buffer-capacity: 10 >... > >As Timo mentioned, the downside to this is that there's not a nice static >way to do this at the moment, unless you extend ConfigOption. It would be >good ultimately if Lookup joins, async scalar functions, and other future >configurable UDFs shared the same methodology, but maybe a unified approach >is a followup discussion. > >I’m just curious why you don’t use conf(global) and query hint(individual >> async udx) to mark the output >> mode 'order' or 'unorder' like async look join [1] and async udtf[2], but >> chose to introduce a new enum >> in AsyncScalarFunction. > > >@Xuyang: I'm open to adding hints. I think the important part is that we >have some method for the user to have a class definition level way to >define whether ORDERED or ALLOW_UNORDERED is most appropriate. I don't >have a strong sense yet for what would be most appropriately exposed as a >FunctionRequirement vs a simple configuration/hint. > >What about throwing an exception to make it clear to users that using async >> scalar functions in this situation >> is problematic instead of executing silently in sync mode? Because users >> may be confused about >> the final actual job graph. > > >@Xuyang: Would it make a difference if it were exposed by the explain >method (the operator having "syncMode" vs not)? I'd be fine to do it >either way -- certainly throwing an error is a bit simpler. > >You are right. Actually it should be the planner that fully decides >> whether ORDERED or UNORDERED is safe to do. For example, if the query is >> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the >> operator is not allowed to produce unordered results. By global >> configuration, we can set ORDERED such that users don't get confused >> about the unordered output. > > >@Timo: Is there an easy way to determine if the output of an async function >would be problematic or not? If the input to the operator is append-only, >it seems fine, because this implies that each row is effectively >independent and ordering is unimportant. For upsert mode with +U rows, you >wouldn't want to swap order with other +U rows for the same key because the >last one should win. For -D or -U rows, you wouldn't want to swap with >other rows for the same key for similar reasons. Is it as simple as >looking for the changlelog mode to determine whether it's safe to run async >functions UNORDERED? I had considered analyzing various query forms (join >vs aggregation vs whatever), but it seems like changelog mode could be >sufficient to understand what works and what would be an issue. Any code >pointers and explanation for similar analysis would be great to understand >this more. > >The mode UNORDERED however should only have >> effect for these simply use cases and throw an exception if UNORDERED >> would mess up a changelog or other subsequent operators. > >@Timo: Should we throw errors or run in sync
Re: [DISCUSS] FLIP-398: Improve Serialization Configuration And Usage In Flink
Hi Ken, I think the main purpose of this FLIP is to change how users interact with the knobs for customizing the serialization behaviors, from requiring code changes to working with pure configurations. Redesigning the knobs (i.e., names, semantics, etc.), on the other hand, is not the purpose of this FLIP. Preserving the existing names and semantics should also help minimize the migration cost for existing users. Therefore, I'm in favor of not changing them. Concerning decoupling from Kryo, and introducing other serialization frameworks like Fury, I think that's a bigger topic that is worth further discussion. At the moment, I'm not aware of any community consensus on doing so. And even if in the future we decide to do so, the changes needed should be the same w/ or w/o this FLIP. So I'd suggest not to block this FLIP on these issues. WDYT? Best, Xintong On Fri, Dec 15, 2023 at 1:40 AM Ken Krugler wrote: > Hi Yong, > > Looks good, thanks for creating this. > > One comment - related to my recent email about Fury, I would love to see > the v2 serialization decoupled from Kryo. > > As part of that, instead of using xxxKryo in methods, call them xxxGeneric. > > A more extreme change would be to totally rely on Fury (so no more POJO > serializer). Fury is faster than the POJO serializer in my tests, but this > would be a much bigger change. > > Though it could dramatically simplify the Flink serialization support. > > — Ken > > PS - a separate issue is how to migrate state from Kryo to something like > Fury, which supports schema evolution. I think this might be possible, by > having a smarter deserializer that identifies state as being created by > Kryo, and using (shaded) Kryo to deserialize, while still writing as Fury. > > > On Dec 6, 2023, at 6:35 PM, Yong Fang wrote: > > > > Hi devs, > > > > I'd like to start a discussion about FLIP-398: Improve Serialization > > Configuration And Usage In Flink [1]. > > > > Currently, users can register custom data types and serializers in Flink > > jobs through various methods, including registration in code, > > configuration, and annotations. These lead to difficulties in upgrading > > Flink jobs and priority issues. > > > > In flink-2.0 we would like to manage job data types and serializers > through > > configurations. This FLIP will introduce a unified option for data type > and > > serializer and users can configure all custom data types and > > pojo/kryo/custom serializers. In addition, this FLIP will add more > built-in > > serializers for complex data types such as List and Map, and optimize the > > management of Avro Serializers. > > > > Looking forward to hearing from you, thanks! > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-398%3A+Improve+Serialization+Configuration+And+Usage+In+Flink > > > > Best, > > Fang Yong > > -- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink & Pinot > > > >
[jira] [Created] (FLINK-33869) Add checkpoint metrics: the latency to close the file
Jufang He created FLINK-33869: - Summary: Add checkpoint metrics: the latency to close the file Key: FLINK-33869 URL: https://issues.apache.org/jira/browse/FLINK-33869 Project: Flink Issue Type: Sub-task Reporter: Jufang He -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33868) Add checkpoint metrics: the latency to write the file
Jufang He created FLINK-33868: - Summary: Add checkpoint metrics: the latency to write the file Key: FLINK-33868 URL: https://issues.apache.org/jira/browse/FLINK-33868 Project: Flink Issue Type: Sub-task Reporter: Jufang He -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33867) Add checkpoint metrics: the rate of file write
Jufang He created FLINK-33867: - Summary: Add checkpoint metrics: the rate of file write Key: FLINK-33867 URL: https://issues.apache.org/jira/browse/FLINK-33867 Project: Flink Issue Type: Sub-task Reporter: Jufang He -- This message was sent by Atlassian Jira (v8.20.10#820010)
FLIP-403: High Availability Services for OLAP Scenarios
Hi, there, We would like to start a discussion thread on "FLIP-403: High Availability Services for OLAP Scenarios"[1]. Currently, Flink's high availability service consists of two mechanisms: leader election/retrieval services for JobManager and persistent services for job metadata. However, these mechanisms are set up in an "all or nothing" manner. In OLAP scenarios, we typically only require leader election/retrieval services for JobManager components since jobs usually do not have a restart strategy. Additionally, the persistence of job states can negatively impact the cluster's throughput, especially for short query jobs. To address these issues, this FLIP proposes splitting the HighAvailabilityServices into LeaderServices and PersistentServices, and enable users to independently configure the high availability strategies specifically related to jobs. Please find more details in the FLIP wiki document [1]. Looking forward to your feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios Best, Yangze Guo
Re: Question on lookup joins
Hi, David. The FLIP-377[1] is about this part. You could take a look at it. Best, Hang [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768 Hang Ruan 于2023年12月17日周日 20:56写道: > Hi, David. > > I think you are right that the value with NULL should not be returned if > the filter push down is closed. > > Maybe you should explain this sql to make sure this filter not be pushed > down to the lookup source. > > I see the configuration > 'table.optimizer.source.predicate-pushdown-enabled' relies on the class > FilterableTableSource, which is deprecated. > I am not sure whether this configuration is still useful for jdbc > connector, which is using the SupportsFilterPushDown. > > Maybe the jdbc connector should read this configuration and return an > empty 'acceptedFilters' in the method 'applyFilters'. > > Best, > Hang > > David Radley 于2023年12月16日周六 01:47写道: > >> Hi , >> I am working on FLINK-33365 which related to JDBC predicate pushdown. I >> want to ensure that the same results occur with predicate pushdown as >> without. So I am asking this question outside the pr / issue. >> >> I notice the following behaviour for lookup joins without predicate >> pushdown. I was not expecting all the s , when there is not a >> matching join key. ’a’ is a table in paimon and ‘db’ is a relational >> database. >> >> >> >> Flink SQL> select * from a; >> >> +++-+ >> >> | op | ip |proctime | >> >> +++-+ >> >> | +I |10.10.10.10 | 2023-12-15 17:36:10.028 | >> >> | +I |20.20.20.20 | 2023-12-15 17:36:10.030 | >> >> | +I |30.30.30.30 | 2023-12-15 17:36:10.031 | >> >> ^CQuery terminated, received a total of 3 rows >> >> >> >> Flink SQL> select * from db_catalog.menagerie.e; >> >> >> +++-+-+-+-+ >> >> | op | ip |type | age | >> height | weight | >> >> >> +++-+-+-+-+ >> >> | +I |10.10.10.10 | 1 | 30 | >>100 | 100 | >> >> | +I |10.10.10.10 | 2 | 40 | >> 90 | 110 | >> >> | +I |10.10.10.10 | 2 | 50 | >> 80 | 120 | >> >> | +I |10.10.10.10 | 3 | 50 | >> 70 | 40 | >> >> | +I |20.20.20.20 | 3 | 30 | >> 80 | 90 | >> >> >> +++-+-+-+-+ >> >> Received a total of 5 rows >> >> >> >> Flink SQL> set table.optimizer.source.predicate-pushdown-enabled=false; >> >> [INFO] Execute statement succeed. >> >> >> >> Flink SQL> SELECT * FROM a left join mariadb_catalog.menagerie.e FOR >> SYSTEM_TIME AS OF a.proctime on e.type = 2 and a.ip = e.ip; >> >> >> +++-++-+-+-+-+ >> >> | op | ip |proctime | >> ip0 |type | age | height | >> weight | >> >> >> +++-++-+-+-+-+ >> >> | +I |10.10.10.10 | 2023-12-15 17:38:05.169 | >> 10.10.10.10 | 2 | 40 | 90 | >> 110 | >> >> | +I |10.10.10.10 | 2023-12-15 17:38:05.169 | >> 10.10.10.10 | 2 | 50 | 80 | >> 120 | >> >> | +I |20.20.20.20 | 2023-12-15 17:38:05.170 | >> | | | | >> | >> >> | +I |30.30.30.30 | 2023-12-15 17:38:05.172 | >> | | | | >> | >> >> Unless otherwise stated above: >> >> IBM United Kingdom Limited >> Registered in England and Wales with number 741598 >> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >> >
[jira] [Created] (FLINK-33866) KafkaSinkBuilder in flink-connector-kafka references DeliveryGuarantee in flink-connector-base
Kurt Ostfeld created FLINK-33866: Summary: KafkaSinkBuilder in flink-connector-kafka references DeliveryGuarantee in flink-connector-base Key: FLINK-33866 URL: https://issues.apache.org/jira/browse/FLINK-33866 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-3.0.2 Reporter: Kurt Ostfeld I have a Flink project that has code like: ``` KafkaSink.builder().setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) ``` This worked with flink-connector-kafka 3.0.1 as well as past versions of Flink. This fails to compile with flink-connector-kafka 3.0.2 because that release changed flink-connector-base to a provided dependency so the reference to the DeliveryGuarantee class becomes a compiler error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources
Hi Becket, The FLIP document has been updated as well. Please take a look when you have time. Thanks, Jiabao > 2023年12月17日 22:54,Jiabao Sun 写道: > > Thanks Becket, > > I apologize for not being able to continue with this proposal due to being > too busy during this period. > > The viewpoints you shared about the design of Flink Source make sense to me > The native configuration ‘ignore.filter.pushdown’ is good to me. > Having a unified name or naming style can indeed prevent confusion for users > regarding > the inconsistent naming of this configuration across different connectors. > > Currently, there are not many external connectors that support filter > pushdown. > I propose that we first introduce it in flink-connector-jdbc and > flink-connector-mongodb. > Do you think this is feasible? > > Best, > Jiabao > > >> 2023年11月16日 17:45,Becket Qin 写道: >> >> Hi Jiabao, >> >> Arguments like "because Spark has it so Flink should also have it" does not >> make sense. Different projects have different API flavors and styles. What >> is really important is the rationale and the design principle behind the >> API. They should conform to the convention of the project. >> >> First of all, Spark Source API itself has a few issues and they ended up >> introduce DataSource V2 in Spark 3.0, which added the decorative interfaces >> like SupportsPushdownXXX. Some of the configurations predating DataSource >> V2 may still be there. >> >> For the Spark configurations you mentioned, they are all the configurations >> for FileScanBuilder, which is equivalent to FileSource in Flink. Currently, >> regardless of the format (ORC, Parquet, Avro, etc), the FileSource pushes >> back all the filters to ensure correctness. The actual filters that got >> applied to the specific format might still be different. This >> implementation is the same in FileScanBuilder.pushFilters() for Spark. I >> don't know why Spark got separate configurations for each format. Maybe it >> is because the filters are actually implemented differently for different >> format. >> >> At least for the current implementation in FileScanBuilder, these >> configurations can be merged to one configuration like >> `apply.filters.to.format.enabled`. Note that this config, as well as the >> separate configs you mentioned, are just visible and used by the >> FileScanBuilder. It determines whether the filters should be passed down to >> the format of the FileScanBuilder instance. Regardless of the value of >> these configs, FileScanBuilder.pushFilters() will always be called, and >> FileScanBuilder (as well as FileSource in Flink) will always push back all >> the filters to the framework. >> >> A MySql source can have a very different way to handle this. For example, A >> MySql source A config in this case might be "my.apply.filters" with three >> different values: >> - AUTO: The Source will issue a DESC Table request to understand whether a >> filter can be applied efficiently. And decide which filters can be applied >> and which cannot based on that. >> - NEVER: Never apply filtering. It will always do a full table read and >> let Flink do the filtering. >> - ALWAYS: Always apply the filtering to the MySql server. >> >> In the above examples of FileSource and MySql Source, I don't think it is a >> good idea to shoehorn the behaviors into a naive config of >> `ignore.filter.pushdown`. That is why I don't think this is a common config. >> >> To recap, like I said, I do agree that in some cases, we may want to behave >> differently when filters are pushed down to the sources, even if a source >> implements SupportsFilterPushDown, but I don't think there is a suitable >> common config for this. The behavior is very likely source specific. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Thu, Nov 16, 2023 at 3:41 PM Jiabao Sun >> wrote: >> >>> Thanks Becket, >>> >>> I still believe that adding a configuration at the source level to disable >>> filter pushdown is needed. This demand exists in spark as well[1]. >>> >>> In Spark, most sources that support filter pushdown provide their own >>> corresponding configuration options to enable or disable filter pushdown. >>> For PRs[2-4] that support filter pushdown capability, they also provide >>> configuration options to disable this capability. >>> >>> I believe this configuration is applicable to most scenarios, and there is >>> no need to dwell on why this configuration option was not introduced >>> earlier than the SupportsFilterPushDown interface. >>> >>> spark.sql.parquet.filterPushdown >>> spark.sql.orc.filterPushdown >>> spark.sql.csv.filterPushdown.enabled >>> spark.sql.json.filterPushdown.enabled >>> spark.sql.avro.filterPushdown.enabled >>> JDBC Option: pushDownPredicate >>> >>> We can see that the lack of consistency is caused by each connector >>> introducing different configuration options for the same behavior. >>> This is one of the motivations for advocating the
Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources
Thanks Becket, I apologize for not being able to continue with this proposal due to being too busy during this period. The viewpoints you shared about the design of Flink Source make sense to me The native configuration ‘ignore.filter.pushdown’ is good to me. Having a unified name or naming style can indeed prevent confusion for users regarding the inconsistent naming of this configuration across different connectors. Currently, there are not many external connectors that support filter pushdown. I propose that we first introduce it in flink-connector-jdbc and flink-connector-mongodb. Do you think this is feasible? Best, Jiabao > 2023年11月16日 17:45,Becket Qin 写道: > > Hi Jiabao, > > Arguments like "because Spark has it so Flink should also have it" does not > make sense. Different projects have different API flavors and styles. What > is really important is the rationale and the design principle behind the > API. They should conform to the convention of the project. > > First of all, Spark Source API itself has a few issues and they ended up > introduce DataSource V2 in Spark 3.0, which added the decorative interfaces > like SupportsPushdownXXX. Some of the configurations predating DataSource > V2 may still be there. > > For the Spark configurations you mentioned, they are all the configurations > for FileScanBuilder, which is equivalent to FileSource in Flink. Currently, > regardless of the format (ORC, Parquet, Avro, etc), the FileSource pushes > back all the filters to ensure correctness. The actual filters that got > applied to the specific format might still be different. This > implementation is the same in FileScanBuilder.pushFilters() for Spark. I > don't know why Spark got separate configurations for each format. Maybe it > is because the filters are actually implemented differently for different > format. > > At least for the current implementation in FileScanBuilder, these > configurations can be merged to one configuration like > `apply.filters.to.format.enabled`. Note that this config, as well as the > separate configs you mentioned, are just visible and used by the > FileScanBuilder. It determines whether the filters should be passed down to > the format of the FileScanBuilder instance. Regardless of the value of > these configs, FileScanBuilder.pushFilters() will always be called, and > FileScanBuilder (as well as FileSource in Flink) will always push back all > the filters to the framework. > > A MySql source can have a very different way to handle this. For example, A > MySql source A config in this case might be "my.apply.filters" with three > different values: > - AUTO: The Source will issue a DESC Table request to understand whether a > filter can be applied efficiently. And decide which filters can be applied > and which cannot based on that. > - NEVER: Never apply filtering. It will always do a full table read and > let Flink do the filtering. > - ALWAYS: Always apply the filtering to the MySql server. > > In the above examples of FileSource and MySql Source, I don't think it is a > good idea to shoehorn the behaviors into a naive config of > `ignore.filter.pushdown`. That is why I don't think this is a common config. > > To recap, like I said, I do agree that in some cases, we may want to behave > differently when filters are pushed down to the sources, even if a source > implements SupportsFilterPushDown, but I don't think there is a suitable > common config for this. The behavior is very likely source specific. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Thu, Nov 16, 2023 at 3:41 PM Jiabao Sun > wrote: > >> Thanks Becket, >> >> I still believe that adding a configuration at the source level to disable >> filter pushdown is needed. This demand exists in spark as well[1]. >> >> In Spark, most sources that support filter pushdown provide their own >> corresponding configuration options to enable or disable filter pushdown. >> For PRs[2-4] that support filter pushdown capability, they also provide >> configuration options to disable this capability. >> >> I believe this configuration is applicable to most scenarios, and there is >> no need to dwell on why this configuration option was not introduced >> earlier than the SupportsFilterPushDown interface. >> >> spark.sql.parquet.filterPushdown >> spark.sql.orc.filterPushdown >> spark.sql.csv.filterPushdown.enabled >> spark.sql.json.filterPushdown.enabled >> spark.sql.avro.filterPushdown.enabled >> JDBC Option: pushDownPredicate >> >> We can see that the lack of consistency is caused by each connector >> introducing different configuration options for the same behavior. >> This is one of the motivations for advocating the introduction of a >> unified configuration name. >> >> [1] https://issues.apache.org/jira/browse/SPARK-24288 >> [2] https://github.com/apache/spark/pull/27366 >> [3]https://github.com/apache/spark/pull/26973 >> [4] https://github.com/apache/spark/pull/29145 >> >> Best, >> Jiabao >>
Re: Question on lookup joins
Hi, David. I think you are right that the value with NULL should not be returned if the filter push down is closed. Maybe you should explain this sql to make sure this filter not be pushed down to the lookup source. I see the configuration 'table.optimizer.source.predicate-pushdown-enabled' relies on the class FilterableTableSource, which is deprecated. I am not sure whether this configuration is still useful for jdbc connector, which is using the SupportsFilterPushDown. Maybe the jdbc connector should read this configuration and return an empty 'acceptedFilters' in the method 'applyFilters'. Best, Hang David Radley 于2023年12月16日周六 01:47写道: > Hi , > I am working on FLINK-33365 which related to JDBC predicate pushdown. I > want to ensure that the same results occur with predicate pushdown as > without. So I am asking this question outside the pr / issue. > > I notice the following behaviour for lookup joins without predicate > pushdown. I was not expecting all the s , when there is not a > matching join key. ’a’ is a table in paimon and ‘db’ is a relational > database. > > > > Flink SQL> select * from a; > > +++-+ > > | op | ip |proctime | > > +++-+ > > | +I |10.10.10.10 | 2023-12-15 17:36:10.028 | > > | +I |20.20.20.20 | 2023-12-15 17:36:10.030 | > > | +I |30.30.30.30 | 2023-12-15 17:36:10.031 | > > ^CQuery terminated, received a total of 3 rows > > > > Flink SQL> select * from db_catalog.menagerie.e; > > > +++-+-+-+-+ > > | op | ip |type | age | > height | weight | > > > +++-+-+-+-+ > > | +I |10.10.10.10 | 1 | 30 | > 100 | 100 | > > | +I |10.10.10.10 | 2 | 40 | > 90 | 110 | > > | +I |10.10.10.10 | 2 | 50 | > 80 | 120 | > > | +I |10.10.10.10 | 3 | 50 | > 70 | 40 | > > | +I |20.20.20.20 | 3 | 30 | > 80 | 90 | > > > +++-+-+-+-+ > > Received a total of 5 rows > > > > Flink SQL> set table.optimizer.source.predicate-pushdown-enabled=false; > > [INFO] Execute statement succeed. > > > > Flink SQL> SELECT * FROM a left join mariadb_catalog.menagerie.e FOR > SYSTEM_TIME AS OF a.proctime on e.type = 2 and a.ip = e.ip; > > > +++-++-+-+-+-+ > > | op | ip |proctime | > ip0 |type | age | height | > weight | > > > +++-++-+-+-+-+ > > | +I |10.10.10.10 | 2023-12-15 17:38:05.169 | > 10.10.10.10 | 2 | 40 | 90 | > 110 | > > | +I |10.10.10.10 | 2023-12-15 17:38:05.169 | > 10.10.10.10 | 2 | 50 | 80 | > 120 | > > | +I |20.20.20.20 | 2023-12-15 17:38:05.170 | > | | | | > | > > | +I |30.30.30.30 | 2023-12-15 17:38:05.172 | > | | | | > | > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU >
[jira] [Created] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration
Rui Fan created FLINK-33865: --- Summary: exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration Key: FLINK-33865 URL: https://issues.apache.org/jira/browse/FLINK-33865 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-12-17-17-56-59-138.png exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration. Reason: when exponential-delay.attempts-before-reset-backoff is set by job Configuration instead of cluster configuration. ExecutionConfig#configure will call RestartStrategies#parseConfiguration to create the RestartStrategyConfiguration. And then RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will create the ExponentialDelayRestartBackoffTimeStrategyFactory by the RestartStrategyConfiguration. Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, so it doesn't support exponential-delay.attempts-before-reset-backoff. I have a misunderstand during FLINK-32895, I thought the RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory will create ExponentialDelayRestartBackoffTimeStrategyFactory by the clusterConfiguration. !image-2023-12-17-17-56-59-138.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)