Re: [VOTE] FLIP-364: Improve the restart-strategy

2023-12-01 Thread Matthias Pohl
+1 (binding)

On Fri, Dec 1, 2023 at 3:40 AM Zhu Zhu  wrote:

> +1 (binding)
>
> Thanks,
> Zhu
>
> Zhanghao Chen  于2023年11月30日周四 23:31写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: Rui Fan <1996fan...@gmail.com>
> > Sent: Monday, November 13, 2023 11:01
> > To: dev 
> > Subject: [VOTE] FLIP-364: Improve the restart-strategy
> >
> > Hi everyone,
> >
> > Thank you to everyone for the feedback on FLIP-364: Improve the
> > restart-strategy[1]
> > which has been discussed in this thread [2].
> >
> > I would like to start a vote for it. The vote will be open for at least
> 72
> > hours unless there is an objection or not enough votes.
> >
> > [1] https://cwiki.apache.org/confluence/x/uJqzDw
> > [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
> >
> > Best,
> > Rui
> >
>


[jira] [Created] (FLINK-33721) Extend BashJavaUtils to Support Reading and Writing Standard YAML Files

2023-12-01 Thread Junrui Li (Jira)
Junrui Li created FLINK-33721:
-

 Summary: Extend BashJavaUtils to Support Reading and Writing 
Standard YAML Files
 Key: FLINK-33721
 URL: https://issues.apache.org/jira/browse/FLINK-33721
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Reporter: Junrui Li


Currently, Flink's shell scripts, such as those used for end-to-end (e2e) 
testing and Docker image building, require the ability to read from and modify 
Flink's configuration files. With the introduction of standard YAML files as 
the configuration format, the existing shell scripts are not equipped to 
correctly handle read and modify operations on these files.

To accommodate this requirement and enhance our script capabilities, we propose 
an extension to the BashJavaUtils functionality. This extension will enable 
BashJavaUtils to support the reading and modifying of standard YAML files, 
ensuring that our shell scripts can seamlessly interact with the new 
configuration format.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Doc about cleaning savePoints and checkPoints

2023-12-01 Thread Zakelly Lan
Hi, Rodrigo

It appears that the configurations you mentioned in your first question are
related to the flink kubernetes operator. Are you using the flink
kubernetes operator?

In regards to the cleaning behavior when users restore a job from a
savepoint or retained checkpoint, you can find detailed information in:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-savepoint-restore-mode
(See "execution.savepoint-restore-mode").  Hope this helps.


Best,
Zakelly

On Fri, Dec 1, 2023 at 3:34 AM Rodrigo Meneses  wrote:

> Hi Flink Community,
>
> I'm searching for docs about how the cleaning of checkpoints and savepoints
> actually work.
>
> I'm interested particularly in the cases when the user has `NATIVE` format
> (incremental savepoint). Somehow, when using NATIVE format, the number of
> savepoints kept are not matching the savepoint parameters like :
> ```
>   ["kubernetes.operator.savepoint.history.max.age"] = "7d"
>   ["kubernetes.operator.savepoint.history.max.count"] = "14"
> ```
>
> Also, I would like to understand better when the checkpoints are cleaned.
> According to
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/
> the checkpoints are cleaned when a program is cancelled. What happens if a
> user suspends and then restores the job? Or when a user upgrades the job?
> Are the checkpoints also cleaned in this situation?
>
> Thanks so much for you time
> -Rodrigo
>


Re: [DISCUSS] Resolve diamond inheritance of Sink.createWriter

2023-12-01 Thread Péter Váry
Thanks Becket for your reply!

*On Option 1:*
- I personally consider API inconsistencies more important, since they will
remain with us "forever", but this is up to the community. I can implement
whichever solution we decide upon.

*Option 2:*
- I don't think this specific issue merits a rewrite, but if we decide to
change our approach, then it's a different story.

*Evolvability:*
This discussion reminds me of a similar discussion on FLIP-372 [1], where
we are trying to decide if we should use mixin interfaces, or use interface
inheritance.
With the mixin approach, we have a more flexible interface, but we can't
check the generic types of the interfaces/classes on compile time, or even
when we create the DAG. The first issue happens when we call the method and
fail.
The issue here is similar:
- *StatefulSink* needs a writer with a method to `*snapshotState*`
- *TwoPhaseCommittingSink* needs a writer with `*prepareCommit*`
- If there is a Sink which is stateful and needs to commit, then it needs
both of these methods.

If we use the mixin solution here, we lose the possibility to check the
types in compile time. We could do the type check in runtime using `
*instanceof*`, so we are better off than with the FLIP-372 example above,
but still lose any important possibility. I personally prefer the mixin
approach, but that would mean we rewrite the Sink API again - likely a
SinkV3. Are we ready to move down that path?

Thanks,
Peter

[1] - https://lists.apache.org/thread/344pzbrqbbb4w0sfj67km25msp7hxlyd

On Thu, Nov 30, 2023, 14:53 Becket Qin  wrote:

> Hi folks,
>
> Sorry for replying late on the thread.
>
> For this particular FLIP, I see two solutions:
>
> Option 1:
> 1. On top of the the current status, rename
> *org.apache.flink.api.connector.sink2.InitContext *to
> *CommonInitContext (*should
> probably be package private*)*.
> 2. Change the name *WriterInitContext* back to *InitContext, *and revert
> the deprecation. We can change the parameter name to writerContext if we
> want to.
> Admittedly, this does not have full symmetric naming of the InitContexts -
> we will have CommonInitContext / InitContext / CommitterInitContext instead
> of InitContext / WriterInitContext / CommitterInitContext. However, the
> naming seems clear without much confusion. Personally, I can live with
> that, treating the class InitContext as a non-ideal legacy class name
> without much material harm.
>
> Option 2:
> Theoretically speaking, if we really want to reach the perfect state while
> being backwards compatible, we can create a brand new set of Sink
> interfaces and deprecate the old ones. But I feel this is an overkill here.
>
> The solution to this particular issue aside, the evolvability of the
> current interface hierarchy seems a more fundamental issue and worries me
> more. I haven't completely thought it through, but there are two noticeable
> differences between the interface design principles between Source and
> Sink.
> 1. Source uses decorative interfaces. For example, we have a
> SupportsFilterPushdown interface, instead of a subclass of
> FilterableSource. This seems provides better flexibility.
> 2. Source tends to have a more coarse-grained interface. For example,
> SourceReader always has the methods of snapshotState(),
> notifyCheckpointComplete(). Even if they may not be always required, we do
> not separate them into different interfaces.
> My hunch is that if we follow similar approach as Source, the evolvability
> might be better. If we want to do this, we'd better to do it before 2.0.
> What do you think?
>
> Process wise,
> - I agree that if there is a change to the passed FLIP during
> implementation, it should be brought back to the mailing list.
> - There might be value for the connector nightly build to depend on the
> latest snapshot of the same Flink major version. It helps catching
> unexpected breaking changes sooner.
> - I'll update the website to reflect the latest API stability policy.
> Apologies for the confusion caused by the stale doc.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Nov 29, 2023 at 10:55 PM Márton Balassi 
> wrote:
>
> > Thanks, Martijn and Peter.
> >
> > In terms of the concrete issue:
> >
> >- I am following up with the author of FLIP-321 [1] (Becket) to update
> >the docs [2] to reflect the right state.
> >- I see two reasonable approaches in terms of proceeding with the
> >specific changeset:
> >
> >
> >1. We allow the exception from FLIP-321 for this change and let the
> >   PublicEvolving API change happen between Flink 1.18 and 1.19, which
> > is
> >   consistent with current state of the relevant documentation. [2]
> > We commit
> >   to helping the connector repos make the necessary (one liner)
> > changes.
> >   2. We revert back to the original implementation plan as explicitly
> >   voted on in FLIP-371 [3]. That has no API breaking changes.
> > However we end
> >   up with an inconsistently named API 

[jira] [Created] (FLINK-33722) MATCH_RECOGNIZE in batch mode ignores events order

2023-12-01 Thread Jira
Grzegorz Kołakowski created FLINK-33722:
---

 Summary: MATCH_RECOGNIZE in batch mode ignores events order
 Key: FLINK-33722
 URL: https://issues.apache.org/jira/browse/FLINK-33722
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.17.1
Reporter: Grzegorz Kołakowski


MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
the following example:
{code:sql}
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY ts ASC
MEASURES
FIRST(A.ts) as _start,
LAST(A.ts) as _middle,
LAST(B.ts) as _finish
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
DEFINE
A AS active is false,
B AS active is true
) AS T {code}
where _events_ is a Postgresql table containing ~1 records.
{code:java}
CREATE TABLE events (
  id INT,
  user_id INT,
  ts TIMESTAMP(3),
  active BOOLEAN,
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/test',
'username' = 'test',
'password' = 'test',
'table-name' = 'events'
); {code}
It can happen that __finish_ is smaller than __start_ or {_}_middle{_}, which 
is wrong.
{noformat}
   user_id  _start _middle 
_finish
 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
14:34:44.264{noformat}
 

Repository where I reproduced the problem: 
https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging

 

According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
process records in processing time:
https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54
A comparator is passed along to the operator covering the sorting on ts field: 
https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173
 but this is only secondary sorting. It is applied only within records of the 
same timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33723) Disallow triggering incremental checkpoint explicitly from REST API

2023-12-01 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-33723:
---

 Summary: Disallow triggering incremental checkpoint explicitly 
from REST API
 Key: FLINK-33723
 URL: https://issues.apache.org/jira/browse/FLINK-33723
 Project: Flink
  Issue Type: Improvement
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Fix For: 1.19.0


Currently, when a job is configured to run with incremental checkpoint 
disabled, user manually triggers an incremental checkpoint actually triggering 
a full checkpoint. That is because the files from full checkpoint cannot be 
shared with an incremental checkpoint. So it is better to remove the 
"INCREMENTAL" option in triggering checkpoint from REST API to avoid 
misunderstanding.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33724) Application mode doesn't support multiple lines argument

2023-12-01 Thread Jiaxing Chen (Jira)
Jiaxing Chen created FLINK-33724:


 Summary: Application mode doesn't support multiple lines argument
 Key: FLINK-33724
 URL: https://issues.apache.org/jira/browse/FLINK-33724
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.18.0
Reporter: Jiaxing Chen


When running a job in application mode, program arguments  are stored in the 
flink-conf.yaml file. These arguments will be loaded when deploying an 
application on YARN. However multiple lines argument cannot be loaded properly.

In my case, I submit a job with sql which is a multiple lines argument:

 
{code:java}
/bin/flink run-application
-t yarn-application
...
myjob.jar
"INSERT INTO tableA
SELECT
a,
b,
c 
FROM tableB;"{code}
In flink-conf.yaml,it saved as:
{code:java}
$internal.application.program-args: "INSERT INTO tableA 
SELECT 
a,
b,
c 
FROM tableB;"{code}
And produced some warning logs when loaded from flink-conf.yaml :
{code:java}
INFO  org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: $internal.application.program-args, "INSERT INTO tableA 
SELECT a,
WARN  org.apache.flink.configuration.GlobalConfiguration   [] - Error 
while trying to split key and value in configuration file /{working 
dir}/flink-conf.yaml:{line num} "SELECT "
...{code}
I dig into the source code and find out that the reader and writer methods of 
YAML file cannot properly parse YAML format. Maybe it would be better to use 
some third-party tools, such as snakeyaml, to do this job.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33725) MathUtils.isPowerOf2 does not cover the case of value=0

2023-12-01 Thread Jes Cok (Jira)
Jes Cok created FLINK-33725:
---

 Summary: MathUtils.isPowerOf2  does not cover the case of value=0
 Key: FLINK-33725
 URL: https://issues.apache.org/jira/browse/FLINK-33725
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Jes Cok


org.apache.flink.util.MathUtils.isPowerOf2, 
 
This static method does not cover the case of value=0.
Should the document explain that value cannot be =0?
Or could it be re implemented as the following code?
 
public static boolean isPowerOf2(long value) {
  return value > 0 && (value & (value - 1)) == 0;
}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[ANNOUNCE] Apache Flink Kafka Connectors 3.0.2 released

2023-12-01 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Kafka Connectors 3.0.2. This release is compatible with the Apache
Flink 1.17.x and 1.18.x release series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353768

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon