Re: [VOTE] FLIP-364: Improve the restart-strategy
+1 (binding) On Fri, Dec 1, 2023 at 3:40 AM Zhu Zhu wrote: > +1 (binding) > > Thanks, > Zhu > > Zhanghao Chen 于2023年11月30日周四 23:31写道: > > > +1 (non-binding) > > > > Best, > > Zhanghao Chen > > > > From: Rui Fan <1996fan...@gmail.com> > > Sent: Monday, November 13, 2023 11:01 > > To: dev > > Subject: [VOTE] FLIP-364: Improve the restart-strategy > > > > Hi everyone, > > > > Thank you to everyone for the feedback on FLIP-364: Improve the > > restart-strategy[1] > > which has been discussed in this thread [2]. > > > > I would like to start a vote for it. The vote will be open for at least > 72 > > hours unless there is an objection or not enough votes. > > > > [1] https://cwiki.apache.org/confluence/x/uJqzDw > > [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym > > > > Best, > > Rui > > >
[jira] [Created] (FLINK-33721) Extend BashJavaUtils to Support Reading and Writing Standard YAML Files
Junrui Li created FLINK-33721: - Summary: Extend BashJavaUtils to Support Reading and Writing Standard YAML Files Key: FLINK-33721 URL: https://issues.apache.org/jira/browse/FLINK-33721 Project: Flink Issue Type: Sub-task Components: API / Core Reporter: Junrui Li Currently, Flink's shell scripts, such as those used for end-to-end (e2e) testing and Docker image building, require the ability to read from and modify Flink's configuration files. With the introduction of standard YAML files as the configuration format, the existing shell scripts are not equipped to correctly handle read and modify operations on these files. To accommodate this requirement and enhance our script capabilities, we propose an extension to the BashJavaUtils functionality. This extension will enable BashJavaUtils to support the reading and modifying of standard YAML files, ensuring that our shell scripts can seamlessly interact with the new configuration format. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Doc about cleaning savePoints and checkPoints
Hi, Rodrigo It appears that the configurations you mentioned in your first question are related to the flink kubernetes operator. Are you using the flink kubernetes operator? In regards to the cleaning behavior when users restore a job from a savepoint or retained checkpoint, you can find detailed information in: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-savepoint-restore-mode (See "execution.savepoint-restore-mode"). Hope this helps. Best, Zakelly On Fri, Dec 1, 2023 at 3:34 AM Rodrigo Meneses wrote: > Hi Flink Community, > > I'm searching for docs about how the cleaning of checkpoints and savepoints > actually work. > > I'm interested particularly in the cases when the user has `NATIVE` format > (incremental savepoint). Somehow, when using NATIVE format, the number of > savepoints kept are not matching the savepoint parameters like : > ``` > ["kubernetes.operator.savepoint.history.max.age"] = "7d" > ["kubernetes.operator.savepoint.history.max.count"] = "14" > ``` > > Also, I would like to understand better when the checkpoints are cleaned. > According to > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/ > the checkpoints are cleaned when a program is cancelled. What happens if a > user suspends and then restores the job? Or when a user upgrades the job? > Are the checkpoints also cleaned in this situation? > > Thanks so much for you time > -Rodrigo >
Re: [DISCUSS] Resolve diamond inheritance of Sink.createWriter
Thanks Becket for your reply! *On Option 1:* - I personally consider API inconsistencies more important, since they will remain with us "forever", but this is up to the community. I can implement whichever solution we decide upon. *Option 2:* - I don't think this specific issue merits a rewrite, but if we decide to change our approach, then it's a different story. *Evolvability:* This discussion reminds me of a similar discussion on FLIP-372 [1], where we are trying to decide if we should use mixin interfaces, or use interface inheritance. With the mixin approach, we have a more flexible interface, but we can't check the generic types of the interfaces/classes on compile time, or even when we create the DAG. The first issue happens when we call the method and fail. The issue here is similar: - *StatefulSink* needs a writer with a method to `*snapshotState*` - *TwoPhaseCommittingSink* needs a writer with `*prepareCommit*` - If there is a Sink which is stateful and needs to commit, then it needs both of these methods. If we use the mixin solution here, we lose the possibility to check the types in compile time. We could do the type check in runtime using ` *instanceof*`, so we are better off than with the FLIP-372 example above, but still lose any important possibility. I personally prefer the mixin approach, but that would mean we rewrite the Sink API again - likely a SinkV3. Are we ready to move down that path? Thanks, Peter [1] - https://lists.apache.org/thread/344pzbrqbbb4w0sfj67km25msp7hxlyd On Thu, Nov 30, 2023, 14:53 Becket Qin wrote: > Hi folks, > > Sorry for replying late on the thread. > > For this particular FLIP, I see two solutions: > > Option 1: > 1. On top of the the current status, rename > *org.apache.flink.api.connector.sink2.InitContext *to > *CommonInitContext (*should > probably be package private*)*. > 2. Change the name *WriterInitContext* back to *InitContext, *and revert > the deprecation. We can change the parameter name to writerContext if we > want to. > Admittedly, this does not have full symmetric naming of the InitContexts - > we will have CommonInitContext / InitContext / CommitterInitContext instead > of InitContext / WriterInitContext / CommitterInitContext. However, the > naming seems clear without much confusion. Personally, I can live with > that, treating the class InitContext as a non-ideal legacy class name > without much material harm. > > Option 2: > Theoretically speaking, if we really want to reach the perfect state while > being backwards compatible, we can create a brand new set of Sink > interfaces and deprecate the old ones. But I feel this is an overkill here. > > The solution to this particular issue aside, the evolvability of the > current interface hierarchy seems a more fundamental issue and worries me > more. I haven't completely thought it through, but there are two noticeable > differences between the interface design principles between Source and > Sink. > 1. Source uses decorative interfaces. For example, we have a > SupportsFilterPushdown interface, instead of a subclass of > FilterableSource. This seems provides better flexibility. > 2. Source tends to have a more coarse-grained interface. For example, > SourceReader always has the methods of snapshotState(), > notifyCheckpointComplete(). Even if they may not be always required, we do > not separate them into different interfaces. > My hunch is that if we follow similar approach as Source, the evolvability > might be better. If we want to do this, we'd better to do it before 2.0. > What do you think? > > Process wise, > - I agree that if there is a change to the passed FLIP during > implementation, it should be brought back to the mailing list. > - There might be value for the connector nightly build to depend on the > latest snapshot of the same Flink major version. It helps catching > unexpected breaking changes sooner. > - I'll update the website to reflect the latest API stability policy. > Apologies for the confusion caused by the stale doc. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Wed, Nov 29, 2023 at 10:55 PM Márton Balassi > wrote: > > > Thanks, Martijn and Peter. > > > > In terms of the concrete issue: > > > >- I am following up with the author of FLIP-321 [1] (Becket) to update > >the docs [2] to reflect the right state. > >- I see two reasonable approaches in terms of proceeding with the > >specific changeset: > > > > > >1. We allow the exception from FLIP-321 for this change and let the > > PublicEvolving API change happen between Flink 1.18 and 1.19, which > > is > > consistent with current state of the relevant documentation. [2] > > We commit > > to helping the connector repos make the necessary (one liner) > > changes. > > 2. We revert back to the original implementation plan as explicitly > > voted on in FLIP-371 [3]. That has no API breaking changes. > > However we end > > up with an inconsistently named API
[jira] [Created] (FLINK-33722) MATCH_RECOGNIZE in batch mode ignores events order
Grzegorz Kołakowski created FLINK-33722: --- Summary: MATCH_RECOGNIZE in batch mode ignores events order Key: FLINK-33722 URL: https://issues.apache.org/jira/browse/FLINK-33722 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.17.1 Reporter: Grzegorz Kołakowski MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider the following example: {code:sql} FROM events MATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY ts ASC MEASURES FIRST(A.ts) as _start, LAST(A.ts) as _middle, LAST(B.ts) as _finish ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS DEFINE A AS active is false, B AS active is true ) AS T {code} where _events_ is a Postgresql table containing ~1 records. {code:java} CREATE TABLE events ( id INT, user_id INT, ts TIMESTAMP(3), active BOOLEAN, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://postgres:5432/test', 'username' = 'test', 'password' = 'test', 'table-name' = 'events' ); {code} It can happen that __finish_ is smaller than __start_ or {_}_middle{_}, which is wrong. {noformat} user_id _start _middle _finish 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 14:34:44.264{noformat} Repository where I reproduced the problem: https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging According to [~dwysakowicz]: In BATCH the CepOperator is always created to process records in processing time: https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54 A comparator is passed along to the operator covering the sorting on ts field: https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173 but this is only secondary sorting. It is applied only within records of the same timestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33723) Disallow triggering incremental checkpoint explicitly from REST API
Zakelly Lan created FLINK-33723: --- Summary: Disallow triggering incremental checkpoint explicitly from REST API Key: FLINK-33723 URL: https://issues.apache.org/jira/browse/FLINK-33723 Project: Flink Issue Type: Improvement Reporter: Zakelly Lan Assignee: Zakelly Lan Fix For: 1.19.0 Currently, when a job is configured to run with incremental checkpoint disabled, user manually triggers an incremental checkpoint actually triggering a full checkpoint. That is because the files from full checkpoint cannot be shared with an incremental checkpoint. So it is better to remove the "INCREMENTAL" option in triggering checkpoint from REST API to avoid misunderstanding. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33724) Application mode doesn't support multiple lines argument
Jiaxing Chen created FLINK-33724: Summary: Application mode doesn't support multiple lines argument Key: FLINK-33724 URL: https://issues.apache.org/jira/browse/FLINK-33724 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.18.0 Reporter: Jiaxing Chen When running a job in application mode, program arguments are stored in the flink-conf.yaml file. These arguments will be loaded when deploying an application on YARN. However multiple lines argument cannot be loaded properly. In my case, I submit a job with sql which is a multiple lines argument: {code:java} /bin/flink run-application -t yarn-application ... myjob.jar "INSERT INTO tableA SELECT a, b, c FROM tableB;"{code} In flink-conf.yaml,it saved as: {code:java} $internal.application.program-args: "INSERT INTO tableA SELECT a, b, c FROM tableB;"{code} And produced some warning logs when loaded from flink-conf.yaml : {code:java} INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: $internal.application.program-args, "INSERT INTO tableA SELECT a, WARN org.apache.flink.configuration.GlobalConfiguration [] - Error while trying to split key and value in configuration file /{working dir}/flink-conf.yaml:{line num} "SELECT " ...{code} I dig into the source code and find out that the reader and writer methods of YAML file cannot properly parse YAML format. Maybe it would be better to use some third-party tools, such as snakeyaml, to do this job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33725) MathUtils.isPowerOf2 does not cover the case of value=0
Jes Cok created FLINK-33725: --- Summary: MathUtils.isPowerOf2 does not cover the case of value=0 Key: FLINK-33725 URL: https://issues.apache.org/jira/browse/FLINK-33725 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Jes Cok org.apache.flink.util.MathUtils.isPowerOf2, This static method does not cover the case of value=0. Should the document explain that value cannot be =0? Or could it be re implemented as the following code? public static boolean isPowerOf2(long value) { return value > 0 && (value & (value - 1)) == 0; } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[ANNOUNCE] Apache Flink Kafka Connectors 3.0.2 released
The Apache Flink community is very happy to announce the release of Apache Flink Kafka Connectors 3.0.2. This release is compatible with the Apache Flink 1.17.x and 1.18.x release series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353768 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, Gordon