[jira] [Created] (FLINK-33367) Invalid Check in DefaultFileFilter

2023-10-25 Thread Chirag Dewan (Jira)
Chirag Dewan created FLINK-33367:


 Summary: Invalid Check in DefaultFileFilter
 Key: FLINK-33367
 URL: https://issues.apache.org/jira/browse/FLINK-33367
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.16.2
Reporter: Chirag Dewan


There is a null check in DefaultFileFilter:

 

if (fileName == null || fileName.length() == 0) {
  return true;
}

 

So 2 questions here -

1) Can a file name ever be null?

2) What will be the behavior with return true? Should be it return false rather?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33366) can not accept statement "EXECUTE STATEMENT SET BEGIN"

2023-10-25 Thread macdoor615 (Jira)
macdoor615 created FLINK-33366:
--

 Summary: can not accept statement "EXECUTE STATEMENT SET  BEGIN"
 Key: FLINK-33366
 URL: https://issues.apache.org/jira/browse/FLINK-33366
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / JDBC
Affects Versions: 1.18.0
 Environment: Flink 1.18.0 

Apache Hive beeline 3.2.3
Reporter: macdoor615


put flink-sql-jdbc-driver-bundle-1.18.0.jar in $HIVE_HOME/lib

start beeline -u 

create table

 
{code:java}
CREATE TABLE table_a (
  a int
) WITH (
  'connector' = 'print'
); {code}
 

output 

 
{code:java}
0: jdbc:flink://:8085> CREATE TABLE table_a (
. . . . . . . . . . . . . . . . . . . >   a int
. . . . . . . . . . . . . . . . . . . > ) WITH (
. . . . . . . . . . . . . . . . . . . >   'connector' = 'print'
. . . . . . . . . . . . . . . . . . . > );
No rows affected (1.119 seconds) {code}
execute statement "EXECUTE STATEMENT SET  BEGIN" and get failure output

 
{code:java}
0: jdbc:flink://xxx:8085> 
0: jdbc:flink://xxx:8085> EXECUTE STATEMENT SET
. . . . . . . . . . . . . . . . . . . >  BEGIN
. . . . . . . . . . . . . . . . . . . > insert into table_a values (1);
The SQL statement is incomplete.
0: jdbc:flink://hb3-dev-euler-001:8085> end;
{code}
execute statement "BEGIN STATEMENT SET;" and get success output

 
{code:java}
0: jdbc:flink://:8085> BEGIN STATEMENT SET;
No rows affected (0.118 seconds)
0: jdbc:flink://:8085> insert into table_a values (1);
No rows affected (0.114 seconds)
0: jdbc:flink://hb3-dev-euler-001:8085> end;
+---+
|              job id               |
+---+
| 1f34ba20cee6a35f2f8b69636ea55d29  |
+---+ {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-10-25 Thread macdoor615 (Jira)
macdoor615 created FLINK-33365:
--

 Summary: Missing filter condition in execution plan containing 
lookup join with mysql jdbc connector
 Key: FLINK-33365
 URL: https://issues.apache.org/jira/browse/FLINK-33365
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.17.1, 1.18.0
 Environment: Flink 1.17.1 & Flink 1.18.0 with 
flink-connector-jdbc-3.1.1-1.17.jar
Reporter: macdoor615


 

create table in flink with sql-client.sh

 
{code:java}
CREATE TABLE default_catalog.default_database.a (
  ip string, 
  proctime as proctime()
) 
WITH (
  'connector' = 'datagen'
);{code}
 

create table in mysql

 
{code:java}
create table b (
  ip varchar(20), 
  type int
);  {code}
 

Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*

excute in sql-client.sh 

 
{code:java}
explain SELECT * FROM default_catalog.default_database.a left join 
bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
a.ip = b.ip; {code}
get the execution plan

 
{code:java}
...
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]){code}
 
excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
*flink-connector-jdbc-3.0.0-1.16.jar*

get the execution plan

 
{code:java}
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS 
INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]) {code}
 

with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 

*lookup=[ip=ip]*

with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 

{*}lookup=[type=0, ip=ip], where=[(type = 0)]{*}{*}{*}

In out real world production environment, incorrect data is output

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-370: Support Balanced Tasks Scheduling

2023-10-25 Thread Zhu Zhu
+1 (binding)

Thanks,
Zhu

Yuepeng Pan  于2023年10月25日周三 11:32写道:

> +1 (non-binding)
>
> Regards,
> Yuepeng Pan
>
> On 2023/10/23 08:25:30 xiangyu feng wrote:
> > Thanks for driving that.
> > +1 (non-binding)
> >
> > Regards,
> > Xiangyu
> >
> > Yu Chen  于2023年10月23日周一 15:19写道:
> >
> > > +1 (non-binding)
> > >
> > > We deeply need this capability to balance Tasks at the Taskmanager
> level in
> > > production, which helps to make a more sufficient usage of Taskmanager
> > > resources. Thanks for driving that.
> > >
> > > Best,
> > > Yu Chen
> > >
> > > Yangze Guo  于2023年10月23日周一 15:08写道:
> > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Oct 23, 2023 at 12:00 PM Rui Fan <1996fan...@gmail.com>
> wrote:
> > > > >
> > > > > +1(binding)
> > > > >
> > > > > Thanks to Yuepeng and to everyone who participated in the
> discussion!
> > > > >
> > > > > Best,
> > > > > Rui
> > > > >
> > > > > On Mon, Oct 23, 2023 at 11:55 AM Roc Marshal 
> wrote:
> > > > >>
> > > > >> Hi all,
> > > > >>
> > > > >> Thanks for all the feedback on FLIP-370[1][2].
> > > > >> I'd like to start a vote for  FLIP-370. The vote will last for at
> > > least
> > > > 72 hours (Oct. 26th at 10:00 A.M. GMT) unless there is an objection
> or
> > > > insufficient votes.
> > > > >>
> > > > >> Thanks,
> > > > >> Yuepeng Pan
> > > > >>
> > > > >> [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
> > > > >> [2]
> https://lists.apache.org/thread/mx3ot0fmk6zr02ccdby0s8oqxqm2pn1x
> > > >
> > >
> >
>


[jira] [Created] (FLINK-33364) Introduce standard YAML for flink configuration

2023-10-25 Thread Junrui Li (Jira)
Junrui Li created FLINK-33364:
-

 Summary: Introduce standard YAML for flink configuration
 Key: FLINK-33364
 URL: https://issues.apache.org/jira/browse/FLINK-33364
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Junrui Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Apache Flink Kafka connector version 3.0.1, RC1

2023-10-25 Thread Qingsheng Ren
+1 (binding)

- Verified signature and checksum
- Verified that no binary exists in the source archive
- Built from source with Java 8 using -Dflink.version=1.18
- Started a local Flink 1.18 cluster, submitted jobs with SQL client
reading from and writing (with exactly-once) to Kafka 3.2.3 cluster
- Nothing suspicious in LICENSE and NOTICE file
- Reviewed web PR

Thanks for the effort, Gordon!

Best,
Qingsheng

On Thu, Oct 26, 2023 at 5:13 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi everyone,
>
> Please review and vote on release candidate #1 for version 3.0.1 of the
> Apache Flink Kafka Connector, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> This release contains important changes for the following:
> - Supports Flink 1.18.x series
> - [FLINK-28303] EOS violation when using LATEST_OFFSETS startup mode
> - [FLINK-33231] Memory leak causing OOM when there are no offsets to commit
> back to Kafka
> - [FLINK-28758] FlinkKafkaConsumer fails to stop with savepoint
>
> The release candidate contains the source release as well as JAR artifacts
> to be released to Maven, built against Flink 1.17.1 and 1.18.0.
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2],
> which are signed with the key with fingerprint
> 1C1E2394D3194E1944613488F320986D35C33D6A [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag v3.0.1-rc1 [5],
> * website pull request listing the new release [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Gordon
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352910
> [2]
>
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1664
> [5] https://github.com/apache/flink-connector-kafka/commits/v3.0.1-rc1
> [6] https://github.com/apache/flink-web/pull/692
>


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Becket Qin
Thanks for the proposal, Jiabao. My two cents below:

1. If I understand correctly, the motivation of the FLIP is mainly to make
predicate pushdown optional on SOME of the Sources. If so, intuitively the
configuration should be Source specific instead of general. Otherwise, we
will end up with general configurations that may not take effect for some
of the Source implementations. This violates the basic rule of a
configuration - it does what it says, regardless of the implementation.
While configuration standardization is usually a good thing, it should not
break the basic rules.
If we really want to have this general configuration, for the sources this
configuration does not apply, they should throw an exception to make it
clear that this configuration is not supported. However, that seems ugly.

2. I think the actual motivation of this FLIP is about "how a source should
implement predicate pushdown efficiently", not "whether predicate pushdown
should be applied to the source." For example, if a source wants to avoid
additional computing load in the external system, it can always read the
entire record and apply the predicates by itself. However, from the Flink
perspective, the predicate pushdown is applied, it is just implemented
differently by the source. So the design principle here is that Flink only
cares about whether a source supports predicate pushdown or not, it does
not care about the implementation efficiency / side effect of the
predicates pushdown. It is the Source implementation's responsibility to
ensure the predicates pushdown is implemented efficiently and does not
impose excessive pressure on the external system. And it is OK to have
additional configurations to achieve this goal. Obviously, such
configurations will be source specific in this case.

3. Regarding the existing configurations of
*table.optimizer.source.predicate-pushdown-enabled.
*I am not sure why we need it. Supposedly, if a source implements a
SupportsXXXPushDown interface, the optimizer should push the corresponding
predicates to the Source. I am not sure in which case this configuration
would be used. Any ideas @Jark Wu ?

Thanks,

Jiangjie (Becket) Qin


On Wed, Oct 25, 2023 at 11:55 PM Jiabao Sun 
wrote:

> Thanks Jane for the detailed explanation.
>
> I think that for users, we should respect conventions over configurations.
> Conventions can be default values explicitly specified in configurations,
> or they can be behaviors that follow previous versions.
> If the same code has different behaviors in different versions, it would
> be a very bad thing.
>
> I agree that for regular users, it is not necessary to understand all the
> configurations related to Flink.
> By following conventions, they can have a good experience.
>
> Let's get back to the practical situation and consider it.
>
> Case 1:
> The user is not familiar with the purpose of the
> table.optimizer.source.predicate-pushdown-enabled configuration but follows
> the convention of allowing predicate pushdown to the source by default.
> Just understanding the source.predicate-pushdown-enabled configuration and
> performing fine-grained toggle control will work well.
>
> Case 2:
> The user understands the meaning of the
> table.optimizer.source.predicate-pushdown-enabled configuration and has set
> its value to false.
> We have reason to believe that the user understands the meaning of the
> predicate pushdown configuration and the intention is to disable predicate
> pushdown (rather than whether or not to allow it).
> The previous choice of globally disabling it is likely because it couldn't
> be disabled on individual sources.
> From this perspective, if we provide more fine-grained configuration
> support and provide detailed explanations of the configuration behaviors in
> the documentation,
> users can clearly understand the differences between these two
> configurations and use them correctly.
>
> Also, I don't agree that table.optimizer.source.predicate-pushdown-enabled
> = true and source.predicate-pushdown-enabled = false means that the local
> configuration overrides the global configuration.
> On the contrary, both configurations are functioning correctly.
> The optimizer allows predicate pushdown to all sources, but some sources
> can reject the filters pushed down by the optimizer.
> This is natural, just like different components at different levels are
> responsible for different tasks.
>
> The more serious issue is that if "source.predicate-pushdown-enabled" does
> not respect "table.optimizer.source.predicate-pushdown-enabled”,
> the "table.optimizer.source.predicate-pushdown-enabled" configuration will
> be invalidated.
> This means that regardless of whether
> "table.optimizer.source.predicate-pushdown-enabled" is set to true or
> false, it will have no effect.
>
> Best,
> Jiabao
>
>
> > 2023年10月25日 22:24,Jane Chan  写道:
> >
> > Hi Jiabao,
> >
> > Thanks for the in-depth clarification. Here are my cents
> >
> > However, 

[jira] [Created] (FLINK-33363) docker bases images can't run the java compiler

2023-10-25 Thread Henning Schmiedehausen (Jira)
Henning Schmiedehausen created FLINK-33363:
--

 Summary: docker bases images can't run the java compiler
 Key: FLINK-33363
 URL: https://issues.apache.org/jira/browse/FLINK-33363
 Project: Flink
  Issue Type: Bug
  Components: flink-docker
Affects Versions: 1.17.1
Reporter: Henning Schmiedehausen


I have set up a small cluster (Job Manager + 2 Task managers) using docker 
compose. When submitting a flink job that needs Calcite planning, it crashes 
with

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Unable to instantiate java compiler
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) 
~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
 ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: Unable to instantiate java compiler
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:163)
 ~[?:?]
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:141)
 ~[?:?]
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
 ~[?:?]
at 
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
 ~[?:?]
at 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
 ~[?:?]
at 
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
 ~[?:?]
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
 ~[?:?]
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
 ~[?:?]
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
 ~[?:?]
at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:565) ~[?:?]
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:428) ~[?:?]
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251) 
~[?:?]
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:130)
 ~[?:?]
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:208) 
~[?:?]
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:195) ~[?:?]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
 ~[?:?]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
 ~[?:?]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
 ~[?:?]
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at 

[VOTE] Apache Flink Kafka connector version 3.0.1, RC1

2023-10-25 Thread Tzu-Li (Gordon) Tai
Hi everyone,

Please review and vote on release candidate #1 for version 3.0.1 of the
Apache Flink Kafka Connector, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

This release contains important changes for the following:
- Supports Flink 1.18.x series
- [FLINK-28303] EOS violation when using LATEST_OFFSETS startup mode
- [FLINK-33231] Memory leak causing OOM when there are no offsets to commit
back to Kafka
- [FLINK-28758] FlinkKafkaConsumer fails to stop with savepoint

The release candidate contains the source release as well as JAR artifacts
to be released to Maven, built against Flink 1.17.1 and 1.18.0.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which are signed with the key with fingerprint
1C1E2394D3194E1944613488F320986D35C33D6A [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag v3.0.1-rc1 [5],
* website pull request listing the new release [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Gordon

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352910
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.1-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1664
[5] https://github.com/apache/flink-connector-kafka/commits/v3.0.1-rc1
[6] https://github.com/apache/flink-web/pull/692


FW: Maven and java version variables

2023-10-25 Thread David Radley
Hi,
I notice another pr, 
https://github.com/apache/flink/pull/23594/files#diff-9c5fb3d1b7e3b0f54bc5c4182965c4fe1f9023d449017cece3005d3f90e8e4d8
 is going to cause a conflict again, unless this is merged,

Are we ok to merge https://github.com/apache/flink/pull/23469  – so I do not 
need to resolve another conflict?
Kind regards,  David.


From: David Radley 
Date: Monday, 23 October 2023 at 12:25
To: dev@flink.apache.org 
Subject: [EXTERNAL] Maven and java version variables
Hi,

I have an open pr in the backlog that improves the pom.xml by introducing some 
Maven variables. The pr is https://github.com/apache/flink/pull/23469
It has been reviewed but not merged. In the meantime another pom change has 
been added that caused a conflict. I have amended the code in my pr to 
implement the new logic, introducing a new java upper bounds version variable.
I notice that the pom change that was added introduced this comment:





I am not sure what the CI setup means and where in the Flink Release wiki the 
java range is mentioned. It would be great if the comment could be extended to 
include links to this information. I am happy to do that as part of this pr , 
if needed, if I can be supplied the links.  I think this pr should be merged 
asap, so subsequent pom file changes use the Maven variables.

  WDYT

Kind regards, David.

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


RE: Maven and java version variables

2023-10-25 Thread David Radley
Hi Matthias,
That sounds reasonable,
Kind regards, David

From: Matthias Pohl 
Date: Monday, 23 October 2023 at 16:41
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: Maven and java version variables
Hi David,
The change that caused the conflict in your PR is caused by FLINK-33291
[1]. I was thinking about adding links to the comments to make the
navigation to the corresponding resources easier as you rightfully
mentioned. I didn't do it in the end because I was afraid that
documentation might be moved in the future and those links wouldn't be
valid anymore. That is why I tried to make the comments descriptive instead.

But I agree: We could definitely do better with the documentation.
...especially (but not only) for CI.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-33291

On Mon, Oct 23, 2023 at 2:53 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> (under "Prepare for the release")
>
> As for CI:
>
> https://github.com/apache/flink/blob/78b5ddb11dfd2a3a00b58079fe9ee29a80555988/tools/ci/maven-utils.sh#L84
>
> https://github.com/apache/flink/blob/9b63099964b36ad9d78649bb6f5b39473e0031bd/tools/azure-pipelines/build-apache-repo.yml#L39
>
> https://github.com/apache/flink/blob/9b63099964b36ad9d78649bb6f5b39473e0031bd/azure-pipelines.yml#L39
>
> Best,
> Alexander Fedulov
>
>
> On Mon, 23 Oct 2023 at 14:44, Jing Ge  wrote:
>
> > Hi David,
> >
> > Please check [1] in the section Verify Java and Maven Version. Thanks!
> >
> > Best regards,
> > Jing
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
> >
> > On Mon, Oct 23, 2023 at 1:25 PM David Radley 
> > wrote:
> >
> > > Hi,
> > >
> > > I have an open pr in the backlog that improves the pom.xml by
> introducing
> > > some Maven variables. The pr is
> > https://github.com/apache/flink/pull/23469
> > > It has been reviewed but not merged. In the meantime another pom change
> > > has been added that caused a conflict. I have amended the code in my pr
> > to
> > > implement the new logic, introducing a new java upper bounds version
> > > variable.
> > > I notice that the pom change that was added introduced this comment:
> > >
> > >  > -->
> > >
> > > 
> > >
> > > I am not sure what the CI setup means and where in the Flink Release
> wiki
> > > the java range is mentioned. It would be great if the comment could be
> > > extended to include links to this information. I am happy to do that as
> > > part of this pr , if needed, if I can be supplied the links.  I think
> > this
> > > pr should be merged asap, so subsequent pom file changes use the Maven
> > > variables.
> > >
> > >   WDYT
> > >
> > > Kind regards, David.
> > >
> > > Unless otherwise stated above:
> > >
> > > IBM United Kingdom Limited
> > > Registered in England and Wales with number 741598
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> > >
> >
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: [VOTE] Add JSON encoding to Avro serialization

2023-10-25 Thread David Radley
Looks good to me +1

From: Ryan Skraba 
Date: Wednesday, 25 October 2023 at 17:19
To: dev@flink.apache.org 
Subject: [EXTERNAL] [VOTE] Add JSON encoding to Avro serialization
Hello!

I'm reviewing a new feature of another contributor (Dale Lane) on
FLINK-33058 that adds JSON-encoding in addition to the binary Avro
serialization format.  He addressed my original objections that JSON
encoding isn't _generally_ a best practice for Avro messages.

The discussion is pretty well-captured in the JIRA and PR, but I
wanted to give it a bit of visiblity and see if there were any strong
opinions on the subject! Given the minor nature of this feature, I
don't think it requires a FLIP.

*TL;DR*:  JSON-encoded Avro might not be ideal for production, but it
has a place for small systems and especially setting up and testing
before making the switch to binary-encoding.

All my best, Ryan

[Jira]: https://issues.apache.org/jira/browse/FLINK-33058
[PR]: https://github.com/apache/flink/pull/23395

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Adding a new channel to Apache Flink slack workspace

2023-10-25 Thread Robin Moffatt
Hi,

I'd like to propose adding a PyFlink channel to the Apache Flink slack
workspace.

By creating a channel focussed on this it will help people find previous
discussions as well as target new discussions and questions to the correct
place. PyFlink is a sufficiently distinct component to make a dedicated
channel viable and useful IMHO.

There was a brief discussion of this on Slack already, the archive for
which can be found here:
https://www.linen.dev/s/apache-flink/t/16006099/re-surfacing-for-the-admins-https-apache-flink-slack-com-arc#1c7e-177a-4c37-8a34-a917883152ac

thanks,

Robin.


[VOTE] Add JSON encoding to Avro serialization

2023-10-25 Thread Ryan Skraba
Hello!

I'm reviewing a new feature of another contributor (Dale Lane) on
FLINK-33058 that adds JSON-encoding in addition to the binary Avro
serialization format.  He addressed my original objections that JSON
encoding isn't _generally_ a best practice for Avro messages.

The discussion is pretty well-captured in the JIRA and PR, but I
wanted to give it a bit of visiblity and see if there were any strong
opinions on the subject! Given the minor nature of this feature, I
don't think it requires a FLIP.

*TL;DR*:  JSON-encoded Avro might not be ideal for production, but it
has a place for small systems and especially setting up and testing
before making the switch to binary-encoding.

All my best, Ryan

[Jira]: https://issues.apache.org/jira/browse/FLINK-33058
[PR]: https://github.com/apache/flink/pull/23395


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Jiabao Sun
Thanks Jane for the detailed explanation.

I think that for users, we should respect conventions over configurations. 
Conventions can be default values explicitly specified in configurations, or 
they can be behaviors that follow previous versions.
If the same code has different behaviors in different versions, it would be a 
very bad thing.

I agree that for regular users, it is not necessary to understand all the 
configurations related to Flink. 
By following conventions, they can have a good experience. 

Let's get back to the practical situation and consider it.

Case 1:
The user is not familiar with the purpose of the 
table.optimizer.source.predicate-pushdown-enabled configuration but follows the 
convention of allowing predicate pushdown to the source by default. 
Just understanding the source.predicate-pushdown-enabled configuration and 
performing fine-grained toggle control will work well.

Case 2:
The user understands the meaning of the 
table.optimizer.source.predicate-pushdown-enabled configuration and has set its 
value to false. 
We have reason to believe that the user understands the meaning of the 
predicate pushdown configuration and the intention is to disable predicate 
pushdown (rather than whether or not to allow it).
The previous choice of globally disabling it is likely because it couldn't be 
disabled on individual sources. 
From this perspective, if we provide more fine-grained configuration support 
and provide detailed explanations of the configuration behaviors in the 
documentation,
users can clearly understand the differences between these two configurations 
and use them correctly.

Also, I don't agree that table.optimizer.source.predicate-pushdown-enabled = 
true and source.predicate-pushdown-enabled = false means that the local 
configuration overrides the global configuration. 
On the contrary, both configurations are functioning correctly. 
The optimizer allows predicate pushdown to all sources, but some sources can 
reject the filters pushed down by the optimizer. 
This is natural, just like different components at different levels are 
responsible for different tasks.

The more serious issue is that if "source.predicate-pushdown-enabled" does not 
respect "table.optimizer.source.predicate-pushdown-enabled”, 
the "table.optimizer.source.predicate-pushdown-enabled" configuration will be 
invalidated. 
This means that regardless of whether 
"table.optimizer.source.predicate-pushdown-enabled" is set to true or false, it 
will have no effect.

Best,
Jiabao


> 2023年10月25日 22:24,Jane Chan  写道:
> 
> Hi Jiabao,
> 
> Thanks for the in-depth clarification. Here are my cents
> 
> However, "table.optimizer.source.predicate-pushdown-enabled" and
>> "scan.filter-push-down.enabled" are configurations for different
>> components(optimizer and source operator).
>> 
> 
> We cannot assume that every user would be interested in understanding the
> internal components of Flink, such as the optimizer or connectors, and the
> specific configurations associated with each component. Instead, users
> might be more concerned about knowing which configuration enables or
> disables the filter push-down feature for all source connectors, and which
> parameter provides the flexibility to override this behavior for a single
> source if needed.
> 
> So, from this perspective, I am inclined to divide these two parameters
> based on the scope of their impact from the user's perspective (i.e.
> global-level or operator-level), rather than categorizing them based on the
> component hierarchy from a developer's point of view. Therefore, based on
> this premise, it is intuitive and natural for users to
> understand fine-grained configuration options can override global
> configurations.
> 
> Additionally, if "scan.filter-push-down.enabled" doesn't respect to
>> "table.optimizer.source.predicate-pushdown-enabled" and the default value
>> of "scan.filter-push-down.enabled" is defined as true,
>> it means that just modifying
>> "table.optimizer.source.predicate-pushdown-enabled" as false will have no
>> effect, and filter pushdown will still be performed.
>> 
>> If we define the default value of "scan.filter-push-down.enabled" as
>> false, it would introduce a difference in behavior compared to the previous
>> version.
>> 
> 
> <1>If I understand correctly, "scan.filter-push-down.enabled" is a
> connector option, which means the only way to configure it is to explicitly
> specify it in DDL (no matter whether disable or enable), and the SET
> command is not applicable, so I think it's natural to still respect user's
> specification here. Otherwise, users might be more confused about why the
> DDL does not work as expected, and the reason is just because some other
> "optimizer" configuration is set to a different value.
> 
> <2> From the implementation side, I am inclined to keep the parameter's
> priority consistent for all conditions.
> 
> Let "global" denote 

Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Jane Chan
Hi Jiabao,

Thanks for the in-depth clarification. Here are my cents

However, "table.optimizer.source.predicate-pushdown-enabled" and
> "scan.filter-push-down.enabled" are configurations for different
> components(optimizer and source operator).
>

We cannot assume that every user would be interested in understanding the
internal components of Flink, such as the optimizer or connectors, and the
specific configurations associated with each component. Instead, users
might be more concerned about knowing which configuration enables or
disables the filter push-down feature for all source connectors, and which
parameter provides the flexibility to override this behavior for a single
source if needed.

So, from this perspective, I am inclined to divide these two parameters
based on the scope of their impact from the user's perspective (i.e.
global-level or operator-level), rather than categorizing them based on the
component hierarchy from a developer's point of view. Therefore, based on
this premise, it is intuitive and natural for users to
understand fine-grained configuration options can override global
configurations.

Additionally, if "scan.filter-push-down.enabled" doesn't respect to
> "table.optimizer.source.predicate-pushdown-enabled" and the default value
> of "scan.filter-push-down.enabled" is defined as true,
> it means that just modifying
> "table.optimizer.source.predicate-pushdown-enabled" as false will have no
> effect, and filter pushdown will still be performed.
>
> If we define the default value of "scan.filter-push-down.enabled" as
> false, it would introduce a difference in behavior compared to the previous
> version.
>

<1>If I understand correctly, "scan.filter-push-down.enabled" is a
connector option, which means the only way to configure it is to explicitly
specify it in DDL (no matter whether disable or enable), and the SET
command is not applicable, so I think it's natural to still respect user's
specification here. Otherwise, users might be more confused about why the
DDL does not work as expected, and the reason is just because some other
"optimizer" configuration is set to a different value.

<2> From the implementation side, I am inclined to keep the parameter's
priority consistent for all conditions.

Let "global" denote "table.optimizer.source.predicate-pushdown-enabled",
and let "per-source" denote "scan.filter-push-down.enabled" for specific
source T,  the following Truth table (based on the current design)
indicates the inconsistent behavior for "per-source override global".

..---.---
.-.
| global   | per-source | push-down for T | per-source override global |
:---+--+---+:
| true   | false | false| Y
|
:---+--+---+:
| false | true   | false| N
|
..---.---.-.

Best,
Jane

On Wed, Oct 25, 2023 at 6:22 PM Jiabao Sun 
wrote:

> Thanks Benchao for the feedback.
>
> I understand that the configuration of global parallelism and task
> parallelism is at different granularities but with the same configuration.
> However, "table.optimizer.source.predicate-pushdown-enabled" and
> "scan.filter-push-down.enabled" are configurations for different
> components(optimizer and source operator).
>
> From a user's perspective, there are two scenarios:
>
> 1. Disabling all filter pushdown
> In this case, setting "table.optimizer.source.predicate-pushdown-enabled"
> to false is sufficient to meet the requirement.
>
> 2. Disabling filter pushdown for specific sources
> In this scenario, there is no need to adjust the value of
> "table.optimizer.source.predicate-pushdown-enabled".
> Instead, the focus should be on the configuration of
> "scan.filter-push-down.enabled" to meet the requirement.
> In this case, users do not need to set
> "table.optimizer.source.predicate-pushdown-enabled" to false and manually
> enable filter pushdown for specific sources.
>
> Additionally, if "scan.filter-push-down.enabled" doesn't respect to
> "table.optimizer.source.predicate-pushdown-enabled" and the default value
> of "scan.filter-push-down.enabled" is defined as true,
> it means that just modifying
> "table.optimizer.source.predicate-pushdown-enabled" as false will have no
> effect, and filter pushdown will still be performed.
>
> If we define the default value of "scan.filter-push-down.enabled" as
> false, it would introduce a difference in behavior compared to the previous
> version.
> The same SQL query that could successfully push down filters in the old
> version but would no longer do so after the upgrade.
>
> Best,
> Jiabao
>
>
> > 2023年10月25日 17:10,Benchao Li  写道:
> >
> > Thanks Jiabao 

[jira] [Created] (FLINK-33362) Document Externalized Declarative Resource Management With Chinese

2023-10-25 Thread ConradJam (Jira)
ConradJam created FLINK-33362:
-

 Summary: Document Externalized Declarative Resource Management 
With Chinese
 Key: FLINK-33362
 URL: https://issues.apache.org/jira/browse/FLINK-33362
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.18.0, 1.18.1
Reporter: ConradJam
 Fix For: 1.18.1


Document Externalized Declarative Resource Management With Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the type of the Committable

2023-10-25 Thread Péter Váry
Hi Gordon,

Thanks for the review, here are my thoughts:

> In terms of the abstraction layering, I was wondering if you've also
considered this approach which I've quickly sketched in my local fork:
https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae

I think we have a few design issues here:
- How to handle the old interface where the transformation is not needed
in the pre-commit phase? - As you have proposed, default method
implementation is a nice solution here, as we do not really have to change
everything in the transformation process.
- How to handle the WithPostCommitTopology interface? - Currently the
parent interface for the sink with a post commit topology is strictly a
single interface (TwoPhaseCommittingSink) and we want to add this to both
type of sinks (new - with transformation / old - without transformation).
In this case we could get away with creating OldTwoPhaseCommittingSink
WithPostCommitTopology and NewTwoPhaseCommittingSinkWithPostCommitTopology,
but this is not a good approach for future extensibility. I tend to prefer
a real mixin approach to creating multiple interfaces for this.

> Quick thought: regarding the awkwardness you mention in the end with
sinks that have post commit topologies, but no pre-commit topologies -
Alternative to the mixin approach in the FLIP, it might make sense to
consider a builder approach for constructing 2PC sinks

TBH, after providing the possibility to transform in the pre-commit phase,
I have started to think about the possible different generalizations:
- Why not have the possibility to have a different return type of the
pre-write phase? - While we have the possibility to transform the data in a
preceding map phase before the Sink, but for some Sinks might want to
encapsulate these transformations before the writes.
- Why not have the explicit possibility to change the return type of the
committer? - We might not want to emit the incoming Committable, we might
want to use the commit hash - or any other data generated by the committer
- in the post-commit topology. So in some cases it might make sense for the
committer to emit elements with different types than the input.
- Why not have everything as a mixin interface and define a Sink this way
(very-very similar to your builder approach)

But I currently do not see explicit requirements for these features, and it
would result in another full rewrite of the Sink API which had a really
troubled history with several rewrites in the recent releases, so I decided
against these big changes and kept the changes minimal.

So, while I personally would love to see the Builder solution, I am afraid
that the Flink community needs some stability around the Sink API for now,
so the different Sinks could start to use this new feature.

What do you think?

Tzu-Li (Gordon) Tai  ezt írta (időpont: 2023. okt.
25., Sze, 2:01):

> Hi Peter,
>
> Thanks a lot for starting this FLIP!
>
> I agree that the current TwoPhaseCommittingSink interfaces is limiting in
> that it assumes 1) committers have the same parallelism as writers, and 2)
> writers immediately produce finalized committables. This FLIP captures the
> problem pretty well, and I do think there are use cases for a more general
> flexible interface outside of the Iceberg connector as well.
>
> In terms of the abstraction layering, I was wondering if you've also
> considered this approach which I've quickly sketched in my local fork:
>
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
>
> With this approach, the sink translator always expect that 2PC sink
> implementations should extend `TwoPhaseCommittingSinkBase` and therefore
> assumes that a pre-commit topology always exist. For simple 2PC sinks that
> do not require transforming committables, we would ship (for convenience)
> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit
> topology is a no-op passthrough. With that we avoid some of the
> "boilerplates" where 2PC sinks with pre-commit topology requires
> implementing two interfaces, as proposed in the FLIP.
>
> Quick thought: regarding the awkwardness you mention in the end with sinks
> that have post commit topologies, but no pre-commit topologies -
> Alternative to the mixin approach in the FLIP, it might make sense to
> consider a builder approach for constructing 2PC sinks, which should also
> give users type-safety at compile time while not having the awkwardness
> with all the types involved. Something along the lines of:
>
> ```
> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier)
> .withPreCommitTopology(writerResultsStream -> ...)   //
> Function, DataStream>
> .withPostCommitTopology(committablesStream -> ...) //
> Consumer>
> .withPreWriteTopology(inputStream -> ...)  //
> Function, DataStream>
> .build();
> ```
>
> We could probably do some validation in the build() method, e.g. if writer
> / committer have 

[jira] [Created] (FLINK-33361) Add Java 17 compatibility to Flink Kafka consumer

2023-10-25 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-33361:
--

 Summary: Add Java 17 compatibility to Flink Kafka consumer
 Key: FLINK-33361
 URL: https://issues.apache.org/jira/browse/FLINK-33361
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: kafka-3.0.1, kafka-3.1.0
Reporter: Martijn Visser


When currently trying to {{mvn clean install -Dflink.version=1.18.0 
-Dscala-2.12 -Prun-end-to-end-tests 
-DdistDir=/Users/mvisser/Developer/flink-1.18.0 
-Dflink.convergence.phase=install 
-Dlog4j.configurationFile=tools/ci/log4j.properties}} this fails with errors 
like:

{code:java}
[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Errors: 
[ERROR] FlinkKafkaConsumerBaseMigrationTest.testRestore
[ERROR]   Run 1: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 2: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 3: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 4: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 5: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 6: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 7: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 8: Exception while creating StreamOperatorStateContext.
[ERROR]   Run 9: Exception while creating StreamOperatorStateContext.
[INFO] 
[ERROR]   
FlinkKafkaConsumerBaseTest.testExplicitStateSerializerCompatibility:721 » 
Runtime
[ERROR]   FlinkKafkaConsumerBaseTest.testScaleDown:742->testRescaling:817 » 
Checkpoint C...
[ERROR]   FlinkKafkaConsumerBaseTest.testScaleUp:737->testRescaling:817 » 
Checkpoint Cou...
[ERROR]   UpsertKafkaDynamicTableFactoryTest.testBufferedTableSink:243 » 
UncheckedIO jav...
{code}

Example stacktrace:

{code:java}
Test 
testBufferedTableSink(org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactoryTest)
 failed with:
java.io.UncheckedIOException: java.io.IOException: Serializing the source 
elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make 
field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: 
module java.base does not "opens java.util" to unnamed module @45b4c3a9
at 
org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
at 
org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
at 
org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2289)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2280)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2266)
at 

[jira] [Created] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-33360:
---

 Summary: HybridSource fails to clear the previous round's state 
when switching sources, leading to data loss
 Key: FLINK-33360
 URL: https://issues.apache.org/jira/browse/FLINK-33360
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HybridSource
Affects Versions: 1.17.1, 1.16.2
Reporter: Feng Jiajie
 Fix For: 1.7.3


org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
{code:java}
            // track readers that have finished processing for current 
enumerator
            finishedReaders.add(subtaskId);
            if (finishedReaders.size() == context.currentParallelism()) {
                LOG.debug("All readers finished, ready to switch enumerator!");
                if (currentSourceIndex + 1 < sources.size()) {
                    switchEnumerator();
                    // switch all readers prior to sending split assignments
                    for (int i = 0; i < context.currentParallelism(); i++) {
                        sendSwitchSourceEvent(i, currentSourceIndex);
                    }
                }
            } {code}
I think that *finishedReaders* is used to keep track of all the subTaskIds that 
have finished reading the current round of the source. Therefore, in the 
*switchEnumerator* function, *finishedReaders* should be cleared:

If it's not cleared, then in the next source reading, whenever any SourceReader 
reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have 
finished processing in parallel), the condition *finishedReaders.size() == 
context.currentParallelism()* will be satisfied and it will trigger 
{*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a 
*SwitchSourceEvent* to all SourceReaders.
If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
previous source, it will execute {*}currentReader.close(){*}, and some data may 
not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Jiabao Sun
Thanks Hang and Lincoln for the good point.

'source.predicate-pushdown.enabled’ is great to me. I have changed the proposal 
document.

Do we need to maintain consistency in hyphen-separated naming style between 
'source.predicate-pushdown-enabled' and 
'table.optimizer.source.predicate-pushdown-enabled'?

Best,
Jiabao


> 2023年10月25日 20:14,Hang Ruan  写道:
> 
> Hi, all,
> 
> Thanks for the lively discussion.
> 
> I agree with Jiabao. I think enabling "scan.filter-push-down.enabled"
> relies on enabling "table.optimizer.source.predicate-pushdown-enabled".
> It is a little strange that the planner still needs to push down the
> filters when we set "scan.filter-push-down.enabled=false" and
> "table.optimizer.source.predicate-pushdown-enabled=true".
> Maybe we need to add some checks to warn the users when setting
> "scan.filter-push-down.enabled=true" and
> "table.optimizer.source.predicate-pushdown-enabled=false".
> 
> Besides that, I am +1 for renaming 'scan.filter-push-down.enabled' to
> 'source.predicate-pushdown.enabled'.
> 
> Best,
> Hang
> 
> Jiabao Sun  于2023年10月25日周三 18:23写道:
> 
>> Thanks Benchao for the feedback.
>> 
>> I understand that the configuration of global parallelism and task
>> parallelism is at different granularities but with the same configuration.
>> However, "table.optimizer.source.predicate-pushdown-enabled" and
>> "scan.filter-push-down.enabled" are configurations for different
>> components(optimizer and source operator).
>> 
>> From a user's perspective, there are two scenarios:
>> 
>> 1. Disabling all filter pushdown
>> In this case, setting "table.optimizer.source.predicate-pushdown-enabled"
>> to false is sufficient to meet the requirement.
>> 
>> 2. Disabling filter pushdown for specific sources
>> In this scenario, there is no need to adjust the value of
>> "table.optimizer.source.predicate-pushdown-enabled".
>> Instead, the focus should be on the configuration of
>> "scan.filter-push-down.enabled" to meet the requirement.
>> In this case, users do not need to set
>> "table.optimizer.source.predicate-pushdown-enabled" to false and manually
>> enable filter pushdown for specific sources.
>> 
>> Additionally, if "scan.filter-push-down.enabled" doesn't respect to
>> "table.optimizer.source.predicate-pushdown-enabled" and the default value
>> of "scan.filter-push-down.enabled" is defined as true,
>> it means that just modifying
>> "table.optimizer.source.predicate-pushdown-enabled" as false will have no
>> effect, and filter pushdown will still be performed.
>> 
>> If we define the default value of "scan.filter-push-down.enabled" as
>> false, it would introduce a difference in behavior compared to the previous
>> version.
>> The same SQL query that could successfully push down filters in the old
>> version but would no longer do so after the upgrade.
>> 
>> Best,
>> Jiabao
>> 
>> 
>>> 2023年10月25日 17:10,Benchao Li  写道:
>>> 
>>> Thanks Jiabao for the detailed explanations, that helps a lot, I
>>> understand your rationale now.
>>> 
>>> Correct me if I'm wrong. Your perspective is from "developer", which
>>> means there is an optimizer and connector component, and if we want to
>>> enable this feature (pushing filters down into connectors), you must
>>> enable it firstly in optimizer, and only then connector has the chance
>>> to decide to use it or not.
>>> 
>>> My perspective is from "user" that (Why a user should care about the
>>> difference of optimizer/connector) , this is a feature, and has two
>>> way to control it, one way is to config it job-level, the other one is
>>> in table properties. What a user expects is that they can control a
>>> feature in a tiered way, that setting it per job, and then
>>> fine-grained tune it per table.
>>> 
>>> This is some kind of similar to other concepts, such as parallelism,
>>> users can set a job level default parallelism, and then fine-grained
>>> tune it per operator. There may be more such debate in the future
>>> e.g., we can have a job level config about adding key-by before lookup
>>> join, and also a hint/table property way to fine-grained control it
>>> per lookup operator. Hence we'd better find a unified way for all
>>> those similar kind of features.
>>> 
>>> Jiabao Sun  于2023年10月25日周三 15:27写道:
 
 Thanks Jane for further explanation.
 
 These two configurations correspond to different levels.
>> "scan.filter-push-down.enabled" does not make
>> "table.optimizer.source.predicate" invalid.
 The planner will still push down predicates to all sources.
 Whether filter pushdown is allowed or not is determined by the specific
>> source's "scan.filter-push-down.enabled" configuration.
 
 However, "table.optimizer.source.predicate" does directly affect
>> "scan.filter-push-down.enabled”.
 When the planner disables predicate pushdown, the source-level filter
>> pushdown will also not be executed, even if the source allows filter
>> pushdown.
 
 Whatever, in point 1 and 2, our expectation is 

Re: [VOTE] FLIP-373: Support Configuring Different State TTLs using SQL Hint

2023-10-25 Thread Sergey Nuyanzin
+1 (binding)

On Wed, Oct 25, 2023 at 2:03 PM liu ron  wrote:

> +1(binding)
>
> Best,
> Ron
>
> Jark Wu  于2023年10月25日周三 19:52写道:
>
> > +1 (binding)
> >
> > Best,
> > Jark
> >
> > On Wed, 25 Oct 2023 at 16:27, Jiabao Sun  .invalid>
> > wrote:
> >
> > > Thanks Jane for driving this.
> > >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Jiabao
> > >
> > >
> > > > 2023年10月25日 16:22,Lincoln Lee  写道:
> > > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > > >
> > > > Zakelly Lan  于2023年10月23日周一 14:15写道:
> > > >
> > > >> +1(non-binding)
> > > >>
> > > >> Best,
> > > >> Zakelly
> > > >>
> > > >> On Mon, Oct 23, 2023 at 1:15 PM Benchao Li 
> > > wrote:
> > > >>>
> > > >>> +1 (binding)
> > > >>>
> > > >>> Feng Jin  于2023年10月23日周一 13:07写道:
> > > 
> > >  +1(non-binding)
> > > 
> > > 
> > >  Best,
> > >  Feng
> > > 
> > > 
> > >  On Mon, Oct 23, 2023 at 11:58 AM Xuyang 
> wrote:
> > > 
> > > > +1(non-binding)
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > >Best!
> > > >Xuyang
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > At 2023-10-23 11:38:15, "Jane Chan" 
> wrote:
> > > >> Hi developers,
> > > >>
> > > >> Thanks for all the feedback on FLIP-373: Support Configuring
> > > >> Different
> > > >> State TTLs using SQL Hint [1].
> > > >> Based on the discussion [2], we have reached a consensus, so I'd
> > > >> like to
> > > >> start a vote.
> > > >>
> > > >> The vote will last for at least 72 hours (Oct. 26th at 10:00
> A.M.
> > > >> GMT)
> > > >> unless there is an objection or insufficient votes.
> > > >>
> > > >> [1]
> > > >>
> > > >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint
> > > >> [2]
> > > >> https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
> > > >>
> > > >> Best,
> > > >> Jane
> > > >
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>>
> > > >>> Best,
> > > >>> Benchao Li
> > > >>
> > >
> > >
> >
>


-- 
Best regards,
Sergey


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Hang Ruan
Hi, all,

Thanks for the lively discussion.

I agree with Jiabao. I think enabling "scan.filter-push-down.enabled"
relies on enabling "table.optimizer.source.predicate-pushdown-enabled".
It is a little strange that the planner still needs to push down the
filters when we set "scan.filter-push-down.enabled=false" and
"table.optimizer.source.predicate-pushdown-enabled=true".
Maybe we need to add some checks to warn the users when setting
"scan.filter-push-down.enabled=true" and
"table.optimizer.source.predicate-pushdown-enabled=false".

Besides that, I am +1 for renaming 'scan.filter-push-down.enabled' to
'source.predicate-pushdown.enabled'.

Best,
Hang

Jiabao Sun  于2023年10月25日周三 18:23写道:

> Thanks Benchao for the feedback.
>
> I understand that the configuration of global parallelism and task
> parallelism is at different granularities but with the same configuration.
> However, "table.optimizer.source.predicate-pushdown-enabled" and
> "scan.filter-push-down.enabled" are configurations for different
> components(optimizer and source operator).
>
> From a user's perspective, there are two scenarios:
>
> 1. Disabling all filter pushdown
> In this case, setting "table.optimizer.source.predicate-pushdown-enabled"
> to false is sufficient to meet the requirement.
>
> 2. Disabling filter pushdown for specific sources
> In this scenario, there is no need to adjust the value of
> "table.optimizer.source.predicate-pushdown-enabled".
> Instead, the focus should be on the configuration of
> "scan.filter-push-down.enabled" to meet the requirement.
> In this case, users do not need to set
> "table.optimizer.source.predicate-pushdown-enabled" to false and manually
> enable filter pushdown for specific sources.
>
> Additionally, if "scan.filter-push-down.enabled" doesn't respect to
> "table.optimizer.source.predicate-pushdown-enabled" and the default value
> of "scan.filter-push-down.enabled" is defined as true,
> it means that just modifying
> "table.optimizer.source.predicate-pushdown-enabled" as false will have no
> effect, and filter pushdown will still be performed.
>
> If we define the default value of "scan.filter-push-down.enabled" as
> false, it would introduce a difference in behavior compared to the previous
> version.
> The same SQL query that could successfully push down filters in the old
> version but would no longer do so after the upgrade.
>
> Best,
> Jiabao
>
>
> > 2023年10月25日 17:10,Benchao Li  写道:
> >
> > Thanks Jiabao for the detailed explanations, that helps a lot, I
> > understand your rationale now.
> >
> > Correct me if I'm wrong. Your perspective is from "developer", which
> > means there is an optimizer and connector component, and if we want to
> > enable this feature (pushing filters down into connectors), you must
> > enable it firstly in optimizer, and only then connector has the chance
> > to decide to use it or not.
> >
> > My perspective is from "user" that (Why a user should care about the
> > difference of optimizer/connector) , this is a feature, and has two
> > way to control it, one way is to config it job-level, the other one is
> > in table properties. What a user expects is that they can control a
> > feature in a tiered way, that setting it per job, and then
> > fine-grained tune it per table.
> >
> > This is some kind of similar to other concepts, such as parallelism,
> > users can set a job level default parallelism, and then fine-grained
> > tune it per operator. There may be more such debate in the future
> > e.g., we can have a job level config about adding key-by before lookup
> > join, and also a hint/table property way to fine-grained control it
> > per lookup operator. Hence we'd better find a unified way for all
> > those similar kind of features.
> >
> > Jiabao Sun  于2023年10月25日周三 15:27写道:
> >>
> >> Thanks Jane for further explanation.
> >>
> >> These two configurations correspond to different levels.
> "scan.filter-push-down.enabled" does not make
> "table.optimizer.source.predicate" invalid.
> >> The planner will still push down predicates to all sources.
> >> Whether filter pushdown is allowed or not is determined by the specific
> source's "scan.filter-push-down.enabled" configuration.
> >>
> >> However, "table.optimizer.source.predicate" does directly affect
> "scan.filter-push-down.enabled”.
> >> When the planner disables predicate pushdown, the source-level filter
> pushdown will also not be executed, even if the source allows filter
> pushdown.
> >>
> >> Whatever, in point 1 and 2, our expectation is consistent.
> >> For the 3rd point, I still think that the planner-level configuration
> takes precedence over the source-level configuration.
> >> It may seem counterintuitive when we globally disable predicate
> pushdown but allow filter pushdown at the source level.
> >>
> >> Best,
> >> Jiabao
> >>
> >>
> >>
> >>> 2023年10月25日 14:35,Jane Chan  写道:
> >>>
> >>> Hi Jiabao,
> >>>
> >>> Thanks for clarifying this. While by "scan.filter-push-down.enabled
> takes a
> 

Re: [VOTE] FLIP-373: Support Configuring Different State TTLs using SQL Hint

2023-10-25 Thread liu ron
+1(binding)

Best,
Ron

Jark Wu  于2023年10月25日周三 19:52写道:

> +1 (binding)
>
> Best,
> Jark
>
> On Wed, 25 Oct 2023 at 16:27, Jiabao Sun 
> wrote:
>
> > Thanks Jane for driving this.
> >
> > +1 (non-binding)
> >
> > Best,
> > Jiabao
> >
> >
> > > 2023年10月25日 16:22,Lincoln Lee  写道:
> > >
> > > +1 (binding)
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Zakelly Lan  于2023年10月23日周一 14:15写道:
> > >
> > >> +1(non-binding)
> > >>
> > >> Best,
> > >> Zakelly
> > >>
> > >> On Mon, Oct 23, 2023 at 1:15 PM Benchao Li 
> > wrote:
> > >>>
> > >>> +1 (binding)
> > >>>
> > >>> Feng Jin  于2023年10月23日周一 13:07写道:
> > 
> >  +1(non-binding)
> > 
> > 
> >  Best,
> >  Feng
> > 
> > 
> >  On Mon, Oct 23, 2023 at 11:58 AM Xuyang  wrote:
> > 
> > > +1(non-binding)
> > >
> > >
> > >
> > >
> > > --
> > >
> > >Best!
> > >Xuyang
> > >
> > >
> > >
> > >
> > >
> > > At 2023-10-23 11:38:15, "Jane Chan"  wrote:
> > >> Hi developers,
> > >>
> > >> Thanks for all the feedback on FLIP-373: Support Configuring
> > >> Different
> > >> State TTLs using SQL Hint [1].
> > >> Based on the discussion [2], we have reached a consensus, so I'd
> > >> like to
> > >> start a vote.
> > >>
> > >> The vote will last for at least 72 hours (Oct. 26th at 10:00 A.M.
> > >> GMT)
> > >> unless there is an objection or insufficient votes.
> > >>
> > >> [1]
> > >>
> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint
> > >> [2]
> > >> https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
> > >>
> > >> Best,
> > >> Jane
> > >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Best,
> > >>> Benchao Li
> > >>
> >
> >
>


Re: [VOTE] FLIP-373: Support Configuring Different State TTLs using SQL Hint

2023-10-25 Thread Jark Wu
+1 (binding)

Best,
Jark

On Wed, 25 Oct 2023 at 16:27, Jiabao Sun 
wrote:

> Thanks Jane for driving this.
>
> +1 (non-binding)
>
> Best,
> Jiabao
>
>
> > 2023年10月25日 16:22,Lincoln Lee  写道:
> >
> > +1 (binding)
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Zakelly Lan  于2023年10月23日周一 14:15写道:
> >
> >> +1(non-binding)
> >>
> >> Best,
> >> Zakelly
> >>
> >> On Mon, Oct 23, 2023 at 1:15 PM Benchao Li 
> wrote:
> >>>
> >>> +1 (binding)
> >>>
> >>> Feng Jin  于2023年10月23日周一 13:07写道:
> 
>  +1(non-binding)
> 
> 
>  Best,
>  Feng
> 
> 
>  On Mon, Oct 23, 2023 at 11:58 AM Xuyang  wrote:
> 
> > +1(non-binding)
> >
> >
> >
> >
> > --
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >
> > At 2023-10-23 11:38:15, "Jane Chan"  wrote:
> >> Hi developers,
> >>
> >> Thanks for all the feedback on FLIP-373: Support Configuring
> >> Different
> >> State TTLs using SQL Hint [1].
> >> Based on the discussion [2], we have reached a consensus, so I'd
> >> like to
> >> start a vote.
> >>
> >> The vote will last for at least 72 hours (Oct. 26th at 10:00 A.M.
> >> GMT)
> >> unless there is an objection or insufficient votes.
> >>
> >> [1]
> >>
> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint
> >> [2]
> >> https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
> >>
> >> Best,
> >> Jane
> >
> >>>
> >>>
> >>>
> >>> --
> >>>
> >>> Best,
> >>> Benchao Li
> >>
>
>


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Jiabao Sun
Thanks Benchao for the feedback.

I understand that the configuration of global parallelism and task parallelism 
is at different granularities but with the same configuration. 
However, "table.optimizer.source.predicate-pushdown-enabled" and 
"scan.filter-push-down.enabled" are configurations for different 
components(optimizer and source operator).

From a user's perspective, there are two scenarios:

1. Disabling all filter pushdown
In this case, setting "table.optimizer.source.predicate-pushdown-enabled" to 
false is sufficient to meet the requirement.

2. Disabling filter pushdown for specific sources 
In this scenario, there is no need to adjust the value of 
"table.optimizer.source.predicate-pushdown-enabled". 
Instead, the focus should be on the configuration of 
"scan.filter-push-down.enabled" to meet the requirement. 
In this case, users do not need to set 
"table.optimizer.source.predicate-pushdown-enabled" to false and manually 
enable filter pushdown for specific sources.

Additionally, if "scan.filter-push-down.enabled" doesn't respect to 
"table.optimizer.source.predicate-pushdown-enabled" and the default value of 
"scan.filter-push-down.enabled" is defined as true,
it means that just modifying 
"table.optimizer.source.predicate-pushdown-enabled" as false will have no 
effect, and filter pushdown will still be performed.

If we define the default value of "scan.filter-push-down.enabled" as false, it 
would introduce a difference in behavior compared to the previous version. 
The same SQL query that could successfully push down filters in the old version 
but would no longer do so after the upgrade.

Best,
Jiabao


> 2023年10月25日 17:10,Benchao Li  写道:
> 
> Thanks Jiabao for the detailed explanations, that helps a lot, I
> understand your rationale now.
> 
> Correct me if I'm wrong. Your perspective is from "developer", which
> means there is an optimizer and connector component, and if we want to
> enable this feature (pushing filters down into connectors), you must
> enable it firstly in optimizer, and only then connector has the chance
> to decide to use it or not.
> 
> My perspective is from "user" that (Why a user should care about the
> difference of optimizer/connector) , this is a feature, and has two
> way to control it, one way is to config it job-level, the other one is
> in table properties. What a user expects is that they can control a
> feature in a tiered way, that setting it per job, and then
> fine-grained tune it per table.
> 
> This is some kind of similar to other concepts, such as parallelism,
> users can set a job level default parallelism, and then fine-grained
> tune it per operator. There may be more such debate in the future
> e.g., we can have a job level config about adding key-by before lookup
> join, and also a hint/table property way to fine-grained control it
> per lookup operator. Hence we'd better find a unified way for all
> those similar kind of features.
> 
> Jiabao Sun  于2023年10月25日周三 15:27写道:
>> 
>> Thanks Jane for further explanation.
>> 
>> These two configurations correspond to different levels. 
>> "scan.filter-push-down.enabled" does not make 
>> "table.optimizer.source.predicate" invalid.
>> The planner will still push down predicates to all sources.
>> Whether filter pushdown is allowed or not is determined by the specific 
>> source's "scan.filter-push-down.enabled" configuration.
>> 
>> However, "table.optimizer.source.predicate" does directly affect 
>> "scan.filter-push-down.enabled”.
>> When the planner disables predicate pushdown, the source-level filter 
>> pushdown will also not be executed, even if the source allows filter 
>> pushdown.
>> 
>> Whatever, in point 1 and 2, our expectation is consistent.
>> For the 3rd point, I still think that the planner-level configuration takes 
>> precedence over the source-level configuration.
>> It may seem counterintuitive when we globally disable predicate pushdown but 
>> allow filter pushdown at the source level.
>> 
>> Best,
>> Jiabao
>> 
>> 
>> 
>>> 2023年10月25日 14:35,Jane Chan  写道:
>>> 
>>> Hi Jiabao,
>>> 
>>> Thanks for clarifying this. While by "scan.filter-push-down.enabled takes a
>>> higher priority" I meant that this value should be respected whenever it is
>>> set explicitly.
>>> 
>>> The conclusion that
>>> 
>>> 2. "table.optimizer.source.predicate" = "true" and
 "scan.filter-push-down.enabled" = "false"
 Allow the planner to perform predicate pushdown, but individual sources do
 not enable filter pushdown.
 
>>> 
>>> This indicates that the option "scan.filter-push-down.enabled = false" for
>>> an individual source connector does indeed override the global-level
>>> planner settings to make a difference. And thus "has a higher priority".
>>> 
>>> While for
>>> 
>>> 3. "table.optimizer.source.predicate" = "false"
 Predicate pushdown is not allowed for the planner.
 Regardless of the value of the "scan.filter-push-down.enabled"
 configuration, filter pushdown 

Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Lincoln Lee
Thank you all for the lively discussion!

Agree with Benchao that from a user's (rather than a developer's) point of
view, it's easier to understand that fine-grained options override global
options.

In addition, for the new option 'scan.filter-push-down.enabled', would it
be
better to keep the name consistent with the global option
'table.optimizer.source.predicate-pushdown-enabled' ?
e.g. 'source.predicate-pushdown.enabled' (here, source contains
LookupTableSource and ScanTableSource)

Best,
Lincoln Lee


Benchao Li  于2023年10月25日周三 17:11写道:

> Thanks Jiabao for the detailed explanations, that helps a lot, I
> understand your rationale now.
>
> Correct me if I'm wrong. Your perspective is from "developer", which
> means there is an optimizer and connector component, and if we want to
> enable this feature (pushing filters down into connectors), you must
> enable it firstly in optimizer, and only then connector has the chance
> to decide to use it or not.
>
> My perspective is from "user" that (Why a user should care about the
> difference of optimizer/connector) , this is a feature, and has two
> way to control it, one way is to config it job-level, the other one is
> in table properties. What a user expects is that they can control a
> feature in a tiered way, that setting it per job, and then
> fine-grained tune it per table.
>
> This is some kind of similar to other concepts, such as parallelism,
> users can set a job level default parallelism, and then fine-grained
> tune it per operator. There may be more such debate in the future
> e.g., we can have a job level config about adding key-by before lookup
> join, and also a hint/table property way to fine-grained control it
> per lookup operator. Hence we'd better find a unified way for all
> those similar kind of features.
>
> Jiabao Sun  于2023年10月25日周三 15:27写道:
> >
> > Thanks Jane for further explanation.
> >
> > These two configurations correspond to different levels.
> "scan.filter-push-down.enabled" does not make
> "table.optimizer.source.predicate" invalid.
> > The planner will still push down predicates to all sources.
> > Whether filter pushdown is allowed or not is determined by the specific
> source's "scan.filter-push-down.enabled" configuration.
> >
> > However, "table.optimizer.source.predicate" does directly affect
> "scan.filter-push-down.enabled”.
> > When the planner disables predicate pushdown, the source-level filter
> pushdown will also not be executed, even if the source allows filter
> pushdown.
> >
> > Whatever, in point 1 and 2, our expectation is consistent.
> > For the 3rd point, I still think that the planner-level configuration
> takes precedence over the source-level configuration.
> > It may seem counterintuitive when we globally disable predicate pushdown
> but allow filter pushdown at the source level.
> >
> > Best,
> > Jiabao
> >
> >
> >
> > > 2023年10月25日 14:35,Jane Chan  写道:
> > >
> > > Hi Jiabao,
> > >
> > > Thanks for clarifying this. While by "scan.filter-push-down.enabled
> takes a
> > > higher priority" I meant that this value should be respected whenever
> it is
> > > set explicitly.
> > >
> > > The conclusion that
> > >
> > > 2. "table.optimizer.source.predicate" = "true" and
> > >> "scan.filter-push-down.enabled" = "false"
> > >> Allow the planner to perform predicate pushdown, but individual
> sources do
> > >> not enable filter pushdown.
> > >>
> > >
> > > This indicates that the option "scan.filter-push-down.enabled = false"
> for
> > > an individual source connector does indeed override the global-level
> > > planner settings to make a difference. And thus "has a higher
> priority".
> > >
> > > While for
> > >
> > > 3. "table.optimizer.source.predicate" = "false"
> > >> Predicate pushdown is not allowed for the planner.
> > >> Regardless of the value of the "scan.filter-push-down.enabled"
> > >> configuration, filter pushdown is disabled.
> > >> In this scenario, the behavior remains consistent with the old
> version as
> > >> well.
> > >>
> > >
> > > I still think "scan.filter-push-down.enabled" should also be respected
> if
> > > it is enabled for individual connectors. WDYT?
> > >
> > > Best,
> > > Jane
> > >
> > > On Wed, Oct 25, 2023 at 1:27 PM Jiabao Sun  .invalid>
> > > wrote:
> > >
> > >> Thanks Benchao for the feedback.
> > >>
> > >> For the current proposal, we recommend keeping the default value of
> > >> "table.optimizer.source.predicate" as true,
> > >> and setting the the default value of newly introduced option
> > >> "scan.filter-push-down.enabled" to true as well.
> > >>
> > >> The main purpose of doing this is to maintain consistency with
> previous
> > >> versions, as whether to perform
> > >> filter pushdown in the old version solely depends on the
> > >> "table.optimizer.source.predicate" option.
> > >> That means by default, as long as a TableSource implements the
> > >> SupportsFilterPushDown interface, filter pushdown is allowed.
> > >> And it seems that we don't have much benefit 

Re: Operator 1.6 to Olm

2023-10-25 Thread Gyula Fóra
Thank you David!

I currently only see the 1.5.0 version as the latest, but I will check back
again later.

Cheers,
Gyula


On Wed, Oct 25, 2023 at 11:17 AM David Radley 
wrote:

> Hi,
> Fyi with some expert direction from James Busche, I have published the 1.6
> OLM and operatorhub.io versions of the Flink operator.  When 1.6.1 is out
> I will do the same again,
>  Kind regards, David.
>
>
>
> From: Gyula Fóra 
> Date: Tuesday, 10 October 2023 at 13:27
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: Operator 1.6 to Olm
> That would be great David, thank you!
>
> Gyula
>
> On Tue, 10 Oct 2023 at 14:13, David Radley 
> wrote:
>
> > Hi,
> > I notice that the latest version in olm of the operator is 1.5. I plan to
> > run the scripts to publish the 1.6 Flink operator to olm,
> >  Kind regards, David.
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


RE: Operator 1.6 to Olm

2023-10-25 Thread David Radley
Hi,
Fyi with some expert direction from James Busche, I have published the 1.6 OLM 
and operatorhub.io versions of the Flink operator.  When 1.6.1 is out I will do 
the same again,
 Kind regards, David.



From: Gyula Fóra 
Date: Tuesday, 10 October 2023 at 13:27
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: Operator 1.6 to Olm
That would be great David, thank you!

Gyula

On Tue, 10 Oct 2023 at 14:13, David Radley  wrote:

> Hi,
> I notice that the latest version in olm of the operator is 1.5. I plan to
> run the scripts to publish the 1.6 Flink operator to olm,
>  Kind regards, David.
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Benchao Li
Thanks Jiabao for the detailed explanations, that helps a lot, I
understand your rationale now.

Correct me if I'm wrong. Your perspective is from "developer", which
means there is an optimizer and connector component, and if we want to
enable this feature (pushing filters down into connectors), you must
enable it firstly in optimizer, and only then connector has the chance
to decide to use it or not.

My perspective is from "user" that (Why a user should care about the
difference of optimizer/connector) , this is a feature, and has two
way to control it, one way is to config it job-level, the other one is
in table properties. What a user expects is that they can control a
feature in a tiered way, that setting it per job, and then
fine-grained tune it per table.

This is some kind of similar to other concepts, such as parallelism,
users can set a job level default parallelism, and then fine-grained
tune it per operator. There may be more such debate in the future
e.g., we can have a job level config about adding key-by before lookup
join, and also a hint/table property way to fine-grained control it
per lookup operator. Hence we'd better find a unified way for all
those similar kind of features.

Jiabao Sun  于2023年10月25日周三 15:27写道:
>
> Thanks Jane for further explanation.
>
> These two configurations correspond to different levels. 
> "scan.filter-push-down.enabled" does not make 
> "table.optimizer.source.predicate" invalid.
> The planner will still push down predicates to all sources.
> Whether filter pushdown is allowed or not is determined by the specific 
> source's "scan.filter-push-down.enabled" configuration.
>
> However, "table.optimizer.source.predicate" does directly affect 
> "scan.filter-push-down.enabled”.
> When the planner disables predicate pushdown, the source-level filter 
> pushdown will also not be executed, even if the source allows filter pushdown.
>
> Whatever, in point 1 and 2, our expectation is consistent.
> For the 3rd point, I still think that the planner-level configuration takes 
> precedence over the source-level configuration.
> It may seem counterintuitive when we globally disable predicate pushdown but 
> allow filter pushdown at the source level.
>
> Best,
> Jiabao
>
>
>
> > 2023年10月25日 14:35,Jane Chan  写道:
> >
> > Hi Jiabao,
> >
> > Thanks for clarifying this. While by "scan.filter-push-down.enabled takes a
> > higher priority" I meant that this value should be respected whenever it is
> > set explicitly.
> >
> > The conclusion that
> >
> > 2. "table.optimizer.source.predicate" = "true" and
> >> "scan.filter-push-down.enabled" = "false"
> >> Allow the planner to perform predicate pushdown, but individual sources do
> >> not enable filter pushdown.
> >>
> >
> > This indicates that the option "scan.filter-push-down.enabled = false" for
> > an individual source connector does indeed override the global-level
> > planner settings to make a difference. And thus "has a higher priority".
> >
> > While for
> >
> > 3. "table.optimizer.source.predicate" = "false"
> >> Predicate pushdown is not allowed for the planner.
> >> Regardless of the value of the "scan.filter-push-down.enabled"
> >> configuration, filter pushdown is disabled.
> >> In this scenario, the behavior remains consistent with the old version as
> >> well.
> >>
> >
> > I still think "scan.filter-push-down.enabled" should also be respected if
> > it is enabled for individual connectors. WDYT?
> >
> > Best,
> > Jane
> >
> > On Wed, Oct 25, 2023 at 1:27 PM Jiabao Sun 
> > wrote:
> >
> >> Thanks Benchao for the feedback.
> >>
> >> For the current proposal, we recommend keeping the default value of
> >> "table.optimizer.source.predicate" as true,
> >> and setting the the default value of newly introduced option
> >> "scan.filter-push-down.enabled" to true as well.
> >>
> >> The main purpose of doing this is to maintain consistency with previous
> >> versions, as whether to perform
> >> filter pushdown in the old version solely depends on the
> >> "table.optimizer.source.predicate" option.
> >> That means by default, as long as a TableSource implements the
> >> SupportsFilterPushDown interface, filter pushdown is allowed.
> >> And it seems that we don't have much benefit in changing the default value
> >> of "table.optimizer.source.predicate" to false.
> >>
> >> Regarding the priority of these two configurations, I believe that
> >> "table.optimizer.source.predicate"
> >> takes precedence over "scan.filter-push-down.enabled" and it exhibits the
> >> following behavior.
> >>
> >> 1. "table.optimizer.source.predicate" = "true" and
> >> "scan.filter-push-down.enabled" = "true"
> >> This is the default behavior, allowing filter pushdown for sources.
> >>
> >> 2. "table.optimizer.source.predicate" = "true" and
> >> "scan.filter-push-down.enabled" = "false"
> >> Allow the planner to perform predicate pushdown, but individual sources do
> >> not enable filter pushdown.
> >>
> >> 3. "table.optimizer.source.predicate" = "false"

Re: [VOTE] Apache Flink Kubernetes Operator Release 1.6.1, release candidate #1

2023-10-25 Thread Rui Fan
Hi Thomas,

Thanks for your verification and feedback!

I tried to build the flink-kubernetes-operator project with Java 17,
it's really not supported right now.

Offline discussion with Gyula, we hope Kubernetes operator supports
compiling with Java 17 as a critical ticket in 1.7.0. I created the
FLINK-33359[1] to follow it.

[1] https://issues.apache.org/jira/browse/FLINK-33359

Best,
Rui

On Wed, Oct 25, 2023 at 8:30 AM Thomas Weise  wrote:

> +1 (binding)
>
> - Verified checksums, signatures, source release content
> - Run unit tests
>
> Side note:   mvn clean verifyfails with Java 17 compiler. While the
> build target version may be 11, preferably a higher JDK version can be used
> to build the source.
>
>  Caused by: java.lang.IllegalAccessError: class
> com.google.googlejavaformat.java.RemoveUnusedImports (in unnamed module
> @0x44f433db) cannot access class com.sun.tools.javac.util.Context (in
> module jdk.compiler) because module jdk.compiler does not export
> com.sun.tools.javac.util to unnamed module @0x44f433db
>
> at
>
> com.google.googlejavaformat.java.RemoveUnusedImports.removeUnusedImports(RemoveUnusedImports.java:187)
>
> Thanks,
> Thomas
>
>
> On Sat, Oct 21, 2023 at 7:35 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi Everyone,
> >
> > Please review and vote on the release candidate #1 for the version 1.6.1
> of
> > Apache Flink Kubernetes Operator,
> > as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > **Release Overview**
> >
> > As an overview, the release consists of the following:
> > a) Kubernetes Operator canonical source distribution (including the
> > Dockerfile), to be deployed to the release repository at dist.apache.org
> > b) Kubernetes Operator Helm Chart to be deployed to the release
> repository
> > at dist.apache.org
> > c) Maven artifacts to be deployed to the Maven Central Repository
> > d) Docker image to be pushed to dockerhub
> >
> > **Staging Areas to Review**
> >
> > The staging areas containing the above mentioned artifacts are as
> follows,
> > for your review:
> > * All artifacts for a,b) can be found in the corresponding dev repository
> > at dist.apache.org [1]
> > * All artifacts for c) can be found at the Apache Nexus Repository [2]
> > * The docker image for d) is staged on github [3]
> >
> > All artifacts are signed with the
> > key B2D64016B940A7E0B9B72E0D7D0528B28037D8BC [4]
> >
> > Other links for your review:
> > * source code tag "release-1.6.1-rc1" [5]
> > * PR to update the website Downloads page to
> > include Kubernetes Operator links [6]
> > * PR to update the doc version of flink-kubernetes-operator[7]
> >
> > **Vote Duration**
> >
> > The voting time will run for at least 72 hours.
> > It is adopted by majority approval, with at least 3 PMC affirmative
> votes.
> >
> > **Note on Verification**
> >
> > You can follow the basic verification guide here[8].
> > Note that you don't need to verify everything yourself, but please make
> > note of what you have tested together with your +- vote.
> >
> > [1]
> >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.6.1-rc1/
> > [2]
> > https://repository.apache.org/content/repositories/orgapacheflink-1663/
> > [3]
> >
> >
> https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/139454270?tag=51eeae1
> > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [5]
> >
> https://github.com/apache/flink-kubernetes-operator/tree/release-1.6.1-rc1
> > [6] https://github.com/apache/flink-web/pull/690
> > [7] https://github.com/apache/flink-kubernetes-operator/pull/687
> > [8]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Kubernetes+Operator+Release
> >
> > Best,
> > Rui
> >
>


[jira] [Created] (FLINK-33359) Kubernetes operator supports Java 17

2023-10-25 Thread Rui Fan (Jira)
Rui Fan created FLINK-33359:
---

 Summary: Kubernetes operator supports Java 17
 Key: FLINK-33359
 URL: https://issues.apache.org/jira/browse/FLINK-33359
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Rui Fan


In the voting mailing list for flink-kubernetes-operator version 1.6.1, Thomas 
mentioned Kubernetes operator doesn't support java 17.

Offline discuss with [~gyfora] , we hope Kubernetes operator supports Java 17 
as a critical ticket in 1.7.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] The Flink Speed Center and benchmark daily run are back online

2023-10-25 Thread Etienne Chauchot

Nice !

Thank you and everyone involved for the hard work.

Etienne

Le 19/10/2023 à 10:24, Zakelly Lan a écrit :

Hi everyone,

Flink benchmarks [1] generate daily performance reports in the Apache
Flink slack channel (#flink-dev-benchmarks) to detect performance
regression [2]. Those benchmarks previously were running on several
machines donated and maintained by Ververica. Unfortunately, those
machines were gone due to account issues [3] and the benchmarks daily
run stopped since August 24th delaying the release of Flink 1.18 a
bit. [4].

Ververica donated several new machines! After several weeks of work, I
have successfully re-established the codespeed panel and benchmark
daily run pipelines on them. At this time, we are pleased to announce
that the Flink Speed Center and benchmark pipelines are back online.
These new machines have a more formal management to ensure that
previous accidents will not occur in the future.

What's more, I successfully recovered historical data backed up by
Yanfei Lei [5]. So with the old domain [6] redirected to the new
machines, the old links that existed in previous records will still be
valid. Besides the benchmarks with Java8 and Java11, I also added a
pipeline for Java17 running daily.

How to use it:
We also registered a new domain name 'flink-speed.xyz' for the Flink
Speed Center [7]. It is recommended to use the new domain in the
future. Currently, the self-service method of triggering benchmarks is
unavailable considering the lack of resources and potential
vulnerabilities of Jenkins. Please contact one of Apache Flink PMCs to
submit a benchmark. More info is updated on the wiki[8].

Daily Monitoring:
The performance daily monitoring on the Apache Flink slack channel [2]
is still unavailable as the benchmark results need more time to
stabilize in the new environment. Once the baseline results become
available for regression detection, I will enable the daily
monitoring.

Please feel free to reach out to me if you have any suggestions or
questions. Thanks Ververica again for denoting machines!


Best,
Zakelly

[1]https://github.com/apache/flink-benchmarks
[2]https://lists.apache.org/thread/zok62sx4m50c79htfp18ymq5vmtgbgxj
[3]https://issues.apache.org/jira/browse/FLINK-33052
[4]https://lists.apache.org//thread/5x28rp3zct4p603hm4zdwx6kfr101w38
[5]https://issues.apache.org/jira/browse/FLINK-30890
[6]http://codespeed.dak8s.net:8000
[7]http://flink-speed.xyz
[8]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115511847

Re: [VOTE] FLIP-373: Support Configuring Different State TTLs using SQL Hint

2023-10-25 Thread Jiabao Sun
Thanks Jane for driving this.

+1 (non-binding)

Best,
Jiabao


> 2023年10月25日 16:22,Lincoln Lee  写道:
> 
> +1 (binding)
> 
> Best,
> Lincoln Lee
> 
> 
> Zakelly Lan  于2023年10月23日周一 14:15写道:
> 
>> +1(non-binding)
>> 
>> Best,
>> Zakelly
>> 
>> On Mon, Oct 23, 2023 at 1:15 PM Benchao Li  wrote:
>>> 
>>> +1 (binding)
>>> 
>>> Feng Jin  于2023年10月23日周一 13:07写道:
 
 +1(non-binding)
 
 
 Best,
 Feng
 
 
 On Mon, Oct 23, 2023 at 11:58 AM Xuyang  wrote:
 
> +1(non-binding)
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> At 2023-10-23 11:38:15, "Jane Chan"  wrote:
>> Hi developers,
>> 
>> Thanks for all the feedback on FLIP-373: Support Configuring
>> Different
>> State TTLs using SQL Hint [1].
>> Based on the discussion [2], we have reached a consensus, so I'd
>> like to
>> start a vote.
>> 
>> The vote will last for at least 72 hours (Oct. 26th at 10:00 A.M.
>> GMT)
>> unless there is an objection or insufficient votes.
>> 
>> [1]
>> 
> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint
>> [2]
>> https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
>> 
>> Best,
>> Jane
> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>> 



Re: [VOTE] FLIP-373: Support Configuring Different State TTLs using SQL Hint

2023-10-25 Thread Lincoln Lee
+1 (binding)

Best,
Lincoln Lee


Zakelly Lan  于2023年10月23日周一 14:15写道:

> +1(non-binding)
>
> Best,
> Zakelly
>
> On Mon, Oct 23, 2023 at 1:15 PM Benchao Li  wrote:
> >
> > +1 (binding)
> >
> > Feng Jin  于2023年10月23日周一 13:07写道:
> > >
> > > +1(non-binding)
> > >
> > >
> > > Best,
> > > Feng
> > >
> > >
> > > On Mon, Oct 23, 2023 at 11:58 AM Xuyang  wrote:
> > >
> > > > +1(non-binding)
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best!
> > > > Xuyang
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > At 2023-10-23 11:38:15, "Jane Chan"  wrote:
> > > > >Hi developers,
> > > > >
> > > > >Thanks for all the feedback on FLIP-373: Support Configuring
> Different
> > > > >State TTLs using SQL Hint [1].
> > > > >Based on the discussion [2], we have reached a consensus, so I'd
> like to
> > > > >start a vote.
> > > > >
> > > > >The vote will last for at least 72 hours (Oct. 26th at 10:00 A.M.
> GMT)
> > > > >unless there is an objection or insufficient votes.
> > > > >
> > > > >[1]
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-373%3A+Support+Configuring+Different+State+TTLs+using+SQL+Hint
> > > > >[2]
> https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
> > > > >
> > > > >Best,
> > > > >Jane
> > > >
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>


Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Jiabao Sun
Thanks Jane for further explanation.

These two configurations correspond to different levels. 
"scan.filter-push-down.enabled" does not make 
"table.optimizer.source.predicate" invalid. 
The planner will still push down predicates to all sources. 
Whether filter pushdown is allowed or not is determined by the specific 
source's "scan.filter-push-down.enabled" configuration.

However, "table.optimizer.source.predicate" does directly affect 
"scan.filter-push-down.enabled”. 
When the planner disables predicate pushdown, the source-level filter pushdown 
will also not be executed, even if the source allows filter pushdown.

Whatever, in point 1 and 2, our expectation is consistent.
For the 3rd point, I still think that the planner-level configuration takes 
precedence over the source-level configuration.
It may seem counterintuitive when we globally disable predicate pushdown but 
allow filter pushdown at the source level.

Best,
Jiabao



> 2023年10月25日 14:35,Jane Chan  写道:
> 
> Hi Jiabao,
> 
> Thanks for clarifying this. While by "scan.filter-push-down.enabled takes a
> higher priority" I meant that this value should be respected whenever it is
> set explicitly.
> 
> The conclusion that
> 
> 2. "table.optimizer.source.predicate" = "true" and
>> "scan.filter-push-down.enabled" = "false"
>> Allow the planner to perform predicate pushdown, but individual sources do
>> not enable filter pushdown.
>> 
> 
> This indicates that the option "scan.filter-push-down.enabled = false" for
> an individual source connector does indeed override the global-level
> planner settings to make a difference. And thus "has a higher priority".
> 
> While for
> 
> 3. "table.optimizer.source.predicate" = "false"
>> Predicate pushdown is not allowed for the planner.
>> Regardless of the value of the "scan.filter-push-down.enabled"
>> configuration, filter pushdown is disabled.
>> In this scenario, the behavior remains consistent with the old version as
>> well.
>> 
> 
> I still think "scan.filter-push-down.enabled" should also be respected if
> it is enabled for individual connectors. WDYT?
> 
> Best,
> Jane
> 
> On Wed, Oct 25, 2023 at 1:27 PM Jiabao Sun 
> wrote:
> 
>> Thanks Benchao for the feedback.
>> 
>> For the current proposal, we recommend keeping the default value of
>> "table.optimizer.source.predicate" as true,
>> and setting the the default value of newly introduced option
>> "scan.filter-push-down.enabled" to true as well.
>> 
>> The main purpose of doing this is to maintain consistency with previous
>> versions, as whether to perform
>> filter pushdown in the old version solely depends on the
>> "table.optimizer.source.predicate" option.
>> That means by default, as long as a TableSource implements the
>> SupportsFilterPushDown interface, filter pushdown is allowed.
>> And it seems that we don't have much benefit in changing the default value
>> of "table.optimizer.source.predicate" to false.
>> 
>> Regarding the priority of these two configurations, I believe that
>> "table.optimizer.source.predicate"
>> takes precedence over "scan.filter-push-down.enabled" and it exhibits the
>> following behavior.
>> 
>> 1. "table.optimizer.source.predicate" = "true" and
>> "scan.filter-push-down.enabled" = "true"
>> This is the default behavior, allowing filter pushdown for sources.
>> 
>> 2. "table.optimizer.source.predicate" = "true" and
>> "scan.filter-push-down.enabled" = "false"
>> Allow the planner to perform predicate pushdown, but individual sources do
>> not enable filter pushdown.
>> 
>> 3. "table.optimizer.source.predicate" = "false"
>> Predicate pushdown is not allowed for the planner.
>> Regardless of the value of the "scan.filter-push-down.enabled"
>> configuration, filter pushdown is disabled.
>> In this scenario, the behavior remains consistent with the old version as
>> well.
>> 
>> 
>> From an implementation perspective, setting the priority of
>> "scan.filter-push-down.enabled" higher than
>> "table.optimizer.source.predicate" is difficult to achieve now.
>> Because the PushFilterIntoSourceScanRuleBase at the planner level takes
>> precedence over the source-level FilterPushDownSpec.
>> Only when the PushFilterIntoSourceScanRuleBase is enabled, will the
>> Source-level filter pushdown be performed.
>> 
>> Additionally, in my opinion, there doesn't seem to be much benefit in
>> setting a higher priority for "scan.filter-push-down.enabled".
>> It may instead affect compatibility and increase implementation complexity.
>> 
>> WDYT?
>> 
>> Best,
>> Jiabao
>> 
>> 
>>> 2023年10月25日 11:56,Benchao Li  写道:
>>> 
>>> I agree with Jane that fine-grained configurations should have higher
>>> priority than job level configurations.
>>> 
>>> For current proposal, we can achieve that:
>>> - Set "table.optimizer.source.predicate" = "true" to enable by
>>> default, and set ""scan.filter-push-down.enabled" = "false" to disable
>>> it per table source
>>> - Set "table.optimizer.source.predicate" = "false" to disable by
>>> default, 

Re: [DISCUSS] FLIP-377: Support configuration to disable filter push down for Table/SQL Sources

2023-10-25 Thread Jane Chan
Hi Jiabao,

Thanks for clarifying this. While by "scan.filter-push-down.enabled takes a
higher priority" I meant that this value should be respected whenever it is
set explicitly.

The conclusion that

2. "table.optimizer.source.predicate" = "true" and
> "scan.filter-push-down.enabled" = "false"
> Allow the planner to perform predicate pushdown, but individual sources do
> not enable filter pushdown.
>

This indicates that the option "scan.filter-push-down.enabled = false" for
an individual source connector does indeed override the global-level
planner settings to make a difference. And thus "has a higher priority".

While for

3. "table.optimizer.source.predicate" = "false"
> Predicate pushdown is not allowed for the planner.
> Regardless of the value of the "scan.filter-push-down.enabled"
> configuration, filter pushdown is disabled.
> In this scenario, the behavior remains consistent with the old version as
> well.
>

I still think "scan.filter-push-down.enabled" should also be respected if
it is enabled for individual connectors. WDYT?

Best,
Jane

On Wed, Oct 25, 2023 at 1:27 PM Jiabao Sun 
wrote:

> Thanks Benchao for the feedback.
>
> For the current proposal, we recommend keeping the default value of
> "table.optimizer.source.predicate" as true,
> and setting the the default value of newly introduced option
> "scan.filter-push-down.enabled" to true as well.
>
> The main purpose of doing this is to maintain consistency with previous
> versions, as whether to perform
> filter pushdown in the old version solely depends on the
> "table.optimizer.source.predicate" option.
> That means by default, as long as a TableSource implements the
> SupportsFilterPushDown interface, filter pushdown is allowed.
> And it seems that we don't have much benefit in changing the default value
> of "table.optimizer.source.predicate" to false.
>
> Regarding the priority of these two configurations, I believe that
> "table.optimizer.source.predicate"
> takes precedence over "scan.filter-push-down.enabled" and it exhibits the
> following behavior.
>
> 1. "table.optimizer.source.predicate" = "true" and
> "scan.filter-push-down.enabled" = "true"
> This is the default behavior, allowing filter pushdown for sources.
>
> 2. "table.optimizer.source.predicate" = "true" and
> "scan.filter-push-down.enabled" = "false"
> Allow the planner to perform predicate pushdown, but individual sources do
> not enable filter pushdown.
>
> 3. "table.optimizer.source.predicate" = "false"
> Predicate pushdown is not allowed for the planner.
> Regardless of the value of the "scan.filter-push-down.enabled"
> configuration, filter pushdown is disabled.
> In this scenario, the behavior remains consistent with the old version as
> well.
>
>
> From an implementation perspective, setting the priority of
> "scan.filter-push-down.enabled" higher than
> "table.optimizer.source.predicate" is difficult to achieve now.
> Because the PushFilterIntoSourceScanRuleBase at the planner level takes
> precedence over the source-level FilterPushDownSpec.
> Only when the PushFilterIntoSourceScanRuleBase is enabled, will the
> Source-level filter pushdown be performed.
>
> Additionally, in my opinion, there doesn't seem to be much benefit in
> setting a higher priority for "scan.filter-push-down.enabled".
> It may instead affect compatibility and increase implementation complexity.
>
> WDYT?
>
> Best,
> Jiabao
>
>
> > 2023年10月25日 11:56,Benchao Li  写道:
> >
> > I agree with Jane that fine-grained configurations should have higher
> > priority than job level configurations.
> >
> > For current proposal, we can achieve that:
> > - Set "table.optimizer.source.predicate" = "true" to enable by
> > default, and set ""scan.filter-push-down.enabled" = "false" to disable
> > it per table source
> > - Set "table.optimizer.source.predicate" = "false" to disable by
> > default, and set ""scan.filter-push-down.enabled" = "true" to enable
> > it per table source
> >
> > Jane Chan  于2023年10月24日周二 23:55写道:
> >>
> >>>
> >>> I believe that the configuration "table.optimizer.source.predicate"
> has a
> >>> higher priority at the planner level than the configuration at the
> source
> >>> level,
> >>> and it seems easy to implement now.
> >>>
> >>
> >> Correct me if I'm wrong, but I think the fine-grained configuration
> >> "scan.filter-push-down.enabled" should have a higher priority because
> the
> >> default value of "table.optimizer.source.predicate" is true. As a
> result,
> >> turning off filter push-down for a specific source will not take effect
> >> unless the default value of "table.optimizer.source.predicate" is
> changed
> >> to false, or, alternatively, let users manually set
> >> "table.optimizer.source.predicate" to false first and then selectively
> >> enable filter push-down for the desired sources, which is less
> intuitive.
> >> WDYT?
> >>
> >> Best,
> >> Jane
> >>
> >> On Tue, Oct 24, 2023 at 6:05 PM Jiabao Sun  .invalid>
> >> wrote:
> >>
> >>> Thanks Jane,
> >>>
> >>> I