[jira] [Created] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership
elon_X created FLINK-35178: -- Summary: Checkpoint CLAIM mode does not fully control snapshot ownership Key: FLINK-35178 URL: https://issues.apache.org/jira/browse/FLINK-35178 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.18.0 Reporter: elon_X Attachments: image-2024-04-20-14-51-21-062.png When I enable incremental checkpointing, and the task fails or is canceled for some reason, restarting the task from {{-s checkpoint_path}} with {{restoreMode CLAIM}} allows the Flink job to recover from the last checkpoint, it just discards the previous checkpoint. Then I found that this leads to the following two cases: 1. If the new checkpoint_x meta file does not reference files in the shared directory under the previous jobID: the shared and taskowned directories from the previous Job will be left as empty directories, and these two directories will persist without being deleted by Flink. !image-2024-04-20-14-51-21-062.png! 2. If the new checkpoint_x meta file references files in the shared directory under the previous jobID: the chk-(x-1) from the previous job will be discarded, but there will still be state data in the shared directory under that job, which might persist for a relatively long time. Here arises the question: the previous job is no longer running, and it's unclear whether users should delete the state data. Deleting it could lead to errors when the task is restarted, as the meta might reference files that can no longer be found; this could be confusing for users. The potential solution might be to reuse the previous job's jobID when restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that allows users to specify the jobID they want to recover from; Please correct me if there's anything I've misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35177) Datagen examples in documentation do not compile
Sergei Morozov created FLINK-35177: -- Summary: Datagen examples in documentation do not compile Key: FLINK-35177 URL: https://issues.apache.org/jira/browse/FLINK-35177 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.19.0 Reporter: Sergei Morozov Currently, the examples look like this: {code} GeneratorFunction generatorFunction = index -> index; double recordsPerSecond = 100; DataGeneratorSource source = new DataGeneratorSource<>( generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordsPerSecond), Types.STRING); {code} The generator function returns Long but the DataGeneratorSource uses String, so their types do not match. Either the generator function needs to be changed to return a string, or the source needs to use Long. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD
Hey! Regarding the question of initialSavepointPath and flinkStateSnapshotReference new object, I think we could simply keep an extra field as part of the flinkStateSnapshotReference object called path. Then the fields could be: namespace, name, path If path is defined we would use that (to support the simple way also) otherwise use the resource. I would still deprecate the initialSavepointPath field in the jobSpec. Regarding the Savepoint/Checkpoint vs FlinkStateSnapshot. What we need: 1. Easy way to list all state snapshots (to select latest) 2. Easy way to reference a savepoint/checkpoint from a jobspec 3. Differentiate state snapshot types (in some cases users may prefer to use checkpoint/savepoint for certain upgrades) -> we should add a label or something for easy selection 4. Be able to delete savepoints (and checkpoints maybe) I am personally slightly more in favor of having a single resource as that ticks all the boxes, while having 2 separate resources will make both listing and referencing harder. We would have to introduce state type in the reference (name + namespace would not be enough to uniquely identify a state snapshot) I wonder if I am missing any good argument against the single FlinkStateSnapshot here. Cheers, Gyula On Fri, Apr 19, 2024 at 9:09 PM Mate Czagany wrote: > Hi Robert and Thomas, > > Thank you for sharing your thoughts, I will try to address your questions > and suggestions: > > 1. I would really love to hear others' inputs as well about separating the > snapshot CRD into two different CRDs instead for savepoints and > checkpoints. I think the main upside is that we would not need the > mandatory savepoint or checkpoint field inside the spec. The two CRs could > share the same status fields, and their specs would be different. > I personally like both solutions, and would love to hear others' thoughts > as well. > > 2. I agree with you that "completed" is not very clear, but I would > suggest the name "alreadyExists". WDYT? > > 3. I think having a retry loop inside the operator does not add too much > complexity to the FLIP. On failure, we check if we have reached the max > retries. If we did, the state will be set to "FAILED", else it will be set > to "TRIGGER_PENDING", causing the operator to retry the task. The "error" > field will always be populated with the latest error. Kubernetes Jobs > already has a similar field called "backoffLimit", maybe we could use the > same name, with the same logic applied, WDYT? > About error events, I think we should keep the "error" field, and upon > successful snapshot, we clear it. I will add to the FLIP that there will be > an event generated for each unsuccessful snapshots. > > 4. I really like the idea of having something like Pod Conditions, but I > think it wouldn't add too much value here, because the only 2 stages > important to the user are "Triggered" and "Completed", and those timestamps > will already be included in the status field. I think it would make more > sense to implement this if there were more lifecycle stages. > > 5. There will be a new field in JobSpec called > "flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore > from. > > > How do you see potential effects on API server performance wrt. number of > objects vs mutations? Is the proposal more or less neutral in that regard? > > While I am not an expert in Kubernetes internals, my understanding is that > for the api-server, editing an existing resource or creating a new one is > not different performance-wise, because the whole resource will always be > written to etcd anyways. > Retrieving the savepoints from etcd will be different though for some > use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment > would require the api-server to retrieve every snapshots first in a > namespace from etcd, then filter them for that specific FlinkDeployment. I > think this is a worst-case scenario, and it will be up to the user to > optimize their queries via e.g. watch queries [1] or resourceVersions [2]. > > > Does that mean one would have to create a FlinkStateSnapshot CR when > starting a new deployment from savepoint? If so, that's rather complicated. > I would prefer something more simple/concise and would rather > keep initialSavepointPath > > Starting a job from a savepoint path will indeed be deprecated with this > FLIP. I agree that it will be more complicated to restore from a savepoint > in those cases, but if the user decides to move away from the deprecated > savepoint mechanisms, every savepoint will result in a new > FlinkStateSnapshot CR. So the only situation I expect this to be an > inconvenience is when the user onboards a new Flink job to the operator. > But I may not be thinking this through, so please let me know if you > disagree. > > Thank you very much for your questions and suggestions! > > [1] > https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes > [2] >
Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD
Hi Robert and Thomas, Thank you for sharing your thoughts, I will try to address your questions and suggestions: 1. I would really love to hear others' inputs as well about separating the snapshot CRD into two different CRDs instead for savepoints and checkpoints. I think the main upside is that we would not need the mandatory savepoint or checkpoint field inside the spec. The two CRs could share the same status fields, and their specs would be different. I personally like both solutions, and would love to hear others' thoughts as well. 2. I agree with you that "completed" is not very clear, but I would suggest the name "alreadyExists". WDYT? 3. I think having a retry loop inside the operator does not add too much complexity to the FLIP. On failure, we check if we have reached the max retries. If we did, the state will be set to "FAILED", else it will be set to "TRIGGER_PENDING", causing the operator to retry the task. The "error" field will always be populated with the latest error. Kubernetes Jobs already has a similar field called "backoffLimit", maybe we could use the same name, with the same logic applied, WDYT? About error events, I think we should keep the "error" field, and upon successful snapshot, we clear it. I will add to the FLIP that there will be an event generated for each unsuccessful snapshots. 4. I really like the idea of having something like Pod Conditions, but I think it wouldn't add too much value here, because the only 2 stages important to the user are "Triggered" and "Completed", and those timestamps will already be included in the status field. I think it would make more sense to implement this if there were more lifecycle stages. 5. There will be a new field in JobSpec called "flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore from. > How do you see potential effects on API server performance wrt. number of objects vs mutations? Is the proposal more or less neutral in that regard? While I am not an expert in Kubernetes internals, my understanding is that for the api-server, editing an existing resource or creating a new one is not different performance-wise, because the whole resource will always be written to etcd anyways. Retrieving the savepoints from etcd will be different though for some use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment would require the api-server to retrieve every snapshots first in a namespace from etcd, then filter them for that specific FlinkDeployment. I think this is a worst-case scenario, and it will be up to the user to optimize their queries via e.g. watch queries [1] or resourceVersions [2]. > Does that mean one would have to create a FlinkStateSnapshot CR when starting a new deployment from savepoint? If so, that's rather complicated. I would prefer something more simple/concise and would rather keep initialSavepointPath Starting a job from a savepoint path will indeed be deprecated with this FLIP. I agree that it will be more complicated to restore from a savepoint in those cases, but if the user decides to move away from the deprecated savepoint mechanisms, every savepoint will result in a new FlinkStateSnapshot CR. So the only situation I expect this to be an inconvenience is when the user onboards a new Flink job to the operator. But I may not be thinking this through, so please let me know if you disagree. Thank you very much for your questions and suggestions! [1] https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes [2] https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions Regards, Mate Thomas Weise ezt írta (időpont: 2024. ápr. 19., P, 11:31): > Thanks for the proposal. > > How do you see potential effects on API server performance wrt. number of > objects vs mutations? Is the proposal more or less neutral in that regard? > > Thanks for the thorough feedback Robert. > > Couple more questions below. > > --> > > On Fri, Apr 19, 2024 at 5:07 AM Robert Metzger > wrote: > > > Hi Mate, > > thanks for proposing this, I'm really excited about your FLIP. I hope my > > questions make sense to you: > > > > 1. I would like to discuss the "FlinkStateSnapshot" name and the fact > that > > users have to use either the savepoint or checkpoint spec inside the > > FlinkStateSnapshot. > > Wouldn't it be more intuitive to introduce two CRs: > > FlinkSavepoint and FlinkCheckpoint > > Ideally they can internally share a lot of code paths, but from a users > > perspective, the abstraction is much clearer. > > > > There are probably pros and cons either way. For example it is desirable to > have a single list of state snapshots when looking for the initial > savepoint for a new deployment etc. > > > > > > 2. I also would like to discuss SavepointSpec.completed, as this name is > > not intuitive to me. How about "ignoreExisting"? > > > > 3. The FLIP proposal seems to leave error handling to the user, e.g. when > > you create a FlinkStateSn
[jira] [Created] (FLINK-35176) Support property authentication connection for JDBC catalog & dynamic table
RocMarshal created FLINK-35176: -- Summary: Support property authentication connection for JDBC catalog & dynamic table Key: FLINK-35176 URL: https://issues.apache.org/jira/browse/FLINK-35176 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Reporter: RocMarshal -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines
+1 (binding) On Fri, Apr 19, 2024 at 10:07 AM Yuepeng Pan wrote: > +1(non-binding) > > Best, > Yuepeng Pan > > At 2024-04-19 15:22:04, "gongzhongqiang" > wrote: > >+1(non-binding) > > > > > >Best, > > > >Zhongqiang Gong > > > >Ron liu 于2024年4月17日周三 14:28写道: > > > >> Hi Dev, > >> > >> Thank you to everyone for the feedback on FLIP-435: Introduce a New > >> Materialized Table for Simplifying Data Pipelines[1][2]. > >> > >> I'd like to start a vote for it. The vote will be open for at least 72 > >> hours unless there is an objection or not enough votes. > >> > >> [1] > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines > >> [2] https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs > >> > >> Best, > >> Ron > >> >
[jira] [Created] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3
Ryan Skraba created FLINK-35175: --- Summary: HadoopDataInputStream can't compile with Hadoop 3.2.3 Key: FLINK-35175 URL: https://issues.apache.org/jira/browse/FLINK-35175 Project: Flink Issue Type: Bug Affects Versions: 1.20.0 Reporter: Ryan Skraba Unfortunately, introduced in FLINK-35045: [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182] was added in Hadoop releases [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72] and [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]. It doesn't exist in flink.hadoop.version [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java], which we are using in end-to-end tests. {code:java} 00:23:55.093 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on project flink-hadoop-fs: Compilation failure: Compilation failure: 00:23:55.093 [ERROR] /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63] cannot find symbol 00:23:55.094 [ERROR] symbol: variable READBYTEBUFFER 00:23:55.094 [ERROR] location: interface org.apache.hadoop.fs.StreamCapabilities 00:23:55.094 [ERROR] /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63] cannot find symbol 00:23:55.094 [ERROR] symbol: variable PREADBYTEBUFFER 00:23:55.094 [ERROR] location: interface org.apache.hadoop.fs.StreamCapabilities 00:23:55.094 [ERROR] /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43] incompatible types: long cannot be converted to org.apache.hadoop.io.ByteBufferPool 00:23:55.094 [ERROR] -> [Help 1] {code} * 1.20 compile_cron_hadoop313 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012&view=logs&j=87489130-75dc-54e4-1f45-80c30aa367a3&t=73da6d75-f30d-5d5a-acbe-487a9dcff678&l=3630 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35174) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink RabbitMQ connector
Danny Cranmer created FLINK-35174: - Summary: Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink RabbitMQ connector Key: FLINK-35174 URL: https://issues.apache.org/jira/browse/FLINK-35174 Project: Flink Issue Type: Technical Debt Components: Connectors / AWS Reporter: Danny Cranmer Assignee: Danny Cranmer Fix For: aws-connector-4.3.0 Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.1 for Flink AWS connectors -- This message was sent by Atlassian Jira (v8.20.10#820010)
[VOTE] Release flink-connector-aws v4.3.0, release candidate #2
Hi everyone, Please review and vote on release candidate #2 for flink-connector-aws v4.3.0, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) This version supports Flink 1.18 and 1.19. The complete staging area is available for your review, which includes: * JIRA release notes [1], * the official Apache source release to be deployed to dist.apache.org [2], which are signed with the key with fingerprint 125FD8DB [3], * all artifacts to be deployed to the Maven Central Repository [4], * source code tag v4.3.0-rc2 [5], * website pull request listing the new release [6]. * CI build of the tag [7]. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Release Manager [1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353793 [2] https://dist.apache.org/repos/dist/dev/flink/flink-connector-aws-4.3.0-rc2 [3] https://dist.apache.org/repos/dist/release/flink/KEYS [4] https://repository.apache.org/content/repositories/orgapacheflink-1721/ [5] https://github.com/apache/flink-connector-aws/releases/tag/v4.3.0-rc2 [6] https://github.com/apache/flink-web/pull/733 [7] https://github.com/apache/flink-connector-aws/actions/runs/8751694197
[jira] [Created] (FLINK-35173) Debezium Custom Time Serializer
ZhengYu Chen created FLINK-35173: Summary: Debezium Custom Time Serializer Key: FLINK-35173 URL: https://issues.apache.org/jira/browse/FLINK-35173 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: 3.1.0 Reporter: ZhengYu Chen Fix For: 3.1.0 Currently, Flink CDC Time encounters time type errors (including DateTime, Time, Date, TimeStamp) when using MySQL Connector (JsonDebeziumDeserializationSchema) as deserialization, and the converted time is wrong. The essential reason is that the timestamp returned by the bottom layer of debezium is UTC (such as io.debezium.time.Timestamp). The community has already had some [PR|https://github.com/apache/flink-cdc/pull/1366/files#diff-e129e9fae3eea0bb32f0019debb4932413c91088d6dae656e2ecb63913badae4], but they are not work. Now a way is provided to provide a solution based on Debezium's custom Convert interface (https://debezium.io/documentation/reference/1.9/development/converters.html), Users can choose to convert the above four time types into STRING according to the specified time format to ensure that users can correctly convert JSON when using the Flink DataStream API. When the user enables this converter, we need to configure it according to the parameters, That's some datastream use case: {code:java} Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("converters", "datetime"); debeziumProperties.setProperty("datetime.database.type", DataBaseType.MYSQL.getType()); debeziumProperties.setProperty("datetime.type", "cn.xxx.sources.cdc.MysqlDebeziumConverter"); debeziumProperties.setProperty("datetime.format.date", "-MM-dd"); debeziumProperties.setProperty("datetime.format.time", "HH:mm:ss"); debeziumProperties.setProperty("datetime.format.datetime", "-MM-dd HH:mm:ss"); debeziumProperties.setProperty("datetime.format.timestamp", "-MM-dd HH:mm:ss"); debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8"); MySqlSourceBuilder builder = MySqlSource.builder() .hostname(url[0]) .port(Integer.parseInt(url[1])) .databaseList(table.getDatabase()) .tableList(getTablePattern(table)) .username(table.getUserName()) .password(table.getPassword()) .debeziumProperties(debeziumProperties); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD
Thanks for the proposal. How do you see potential effects on API server performance wrt. number of objects vs mutations? Is the proposal more or less neutral in that regard? Thanks for the thorough feedback Robert. Couple more questions below. --> On Fri, Apr 19, 2024 at 5:07 AM Robert Metzger wrote: > Hi Mate, > thanks for proposing this, I'm really excited about your FLIP. I hope my > questions make sense to you: > > 1. I would like to discuss the "FlinkStateSnapshot" name and the fact that > users have to use either the savepoint or checkpoint spec inside the > FlinkStateSnapshot. > Wouldn't it be more intuitive to introduce two CRs: > FlinkSavepoint and FlinkCheckpoint > Ideally they can internally share a lot of code paths, but from a users > perspective, the abstraction is much clearer. > There are probably pros and cons either way. For example it is desirable to have a single list of state snapshots when looking for the initial savepoint for a new deployment etc. > > 2. I also would like to discuss SavepointSpec.completed, as this name is > not intuitive to me. How about "ignoreExisting"? > > 3. The FLIP proposal seems to leave error handling to the user, e.g. when > you create a FlinkStateSnapshot, it will just move to status FAILED. > Typically in K8s with the control loop stuff, resources are tried to get > created until success. I think it would be really nice if the > FlinkStateSnapshot or FlinkSavepoint resource would retry based on a > property in the resource. A "FlinkStateSnapshot.retries" number would > indicate how often the user wants the operator to retry creating a > savepoint, "retries = -1" means retry forever. In addition, we could > consider a timeout as well, however, I haven't seen such a concept in K8s > CRs yet. > The benefit of this is that other tools relying on the K8s operator > wouldn't have to implement this retry loop (which is quite natural for > K8s), they would just have to wait for the CR they've created to transition > into COMPLETED: > > 3. FlinkStateSnapshotStatus.error will only show the last error. What > about using Events, so that we can show multiple errors and use the > FlinkStateSnapshotState to report errors? > > 4. I wonder if it makes sense to use something like Pod Conditions (e.g. > Savepoint Conditions): > https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions > to track the completion status. We could have the following conditions: > - Triggered > - Completed > - Failed > The only benefit of this proposal that I see is that it would tell the > user how long it took to create the savepoint. > > 5. You mention that "JobSpec.initialSavepointPath" will be deprecated. I > assume we will introduce a new field for referencing a FlinkStateSnapshot > CR? I think it would be good to cover this in the FLIP. > > Does that mean one would have to create a FlinkStateSnapshot CR when starting a new deployment from savepoint? If so, that's rather complicated. I would prefer something more simple/concise and would rather keep initialSavepointPath > > One minor comment: > > "/** Dispose the savepoints upon CRD deletion. */" > > I think this should be "upon CR deletion", not "CRD deletion". > > Thanks again for this great FLIP! > > Best, > Robert > > > On Fri, Apr 19, 2024 at 9:01 AM Gyula Fóra wrote: > >> Cc'ing some folks who gave positive feedback on this idea in the past. >> >> I would love to hear your thoughts on the proposed design >> >> Gyula >> >> On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás >> wrote: >> >>> +1 Looking forward to it >>> >>> On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany wrote: >>> >>> > Thank you Gyula! >>> > >>> > I think that is a great idea. I have updated the Google doc to only >>> have 1 >>> > new configuration option of boolean type, which can be used to signal >>> the >>> > Operator to use the old mode. >>> > >>> > Also described in the configuration description, the Operator will >>> fallback >>> > to the old mode if the FlinkStateSnapshot CRD cannot be found on the >>> > Kubernetes cluster. >>> > >>> > Regards, >>> > Mate >>> > >>> > Gyula Fóra ezt írta (időpont: 2024. ápr. 16., >>> K, >>> > 17:01): >>> > >>> > > Thanks Mate, this is great stuff. >>> > > >>> > > Mate, I think the new configs should probably default to the new >>> mode and >>> > > they should only be useful for users to fall back to the old >>> behaviour. >>> > > We could by default use the new Snapshot CRD if the CRD is installed, >>> > > otherwise use the old mode by default and log a warning on startup. >>> > > >>> > > So I am suggesting a "dynamic" default behaviour based on whether >>> the new >>> > > CRD was installed or not because we don't want to break operator >>> startup. >>> > > >>> > > Gyula >>> > > >>> > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany >>> wrote: >>> > > >>> > > > Hi Ferenc, >>> > > > >>> > > > Thank you for your comments, I have updated the Google docs with a >>> new >>> > > > section for the new co
[jira] [Created] (FLINK-35172) DDL statement is added to the Schema Change Event
melin created FLINK-35172: - Summary: DDL statement is added to the Schema Change Event Key: FLINK-35172 URL: https://issues.apache.org/jira/browse/FLINK-35172 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: melin The current implementation of the kafka pipeline data sink connector does not write ddl statements to the topic because the original dddl statements are missing. ddl cannot be generated backwards using a Schema Change Event. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD
Hi Mate, thanks for proposing this, I'm really excited about your FLIP. I hope my questions make sense to you: 1. I would like to discuss the "FlinkStateSnapshot" name and the fact that users have to use either the savepoint or checkpoint spec inside the FlinkStateSnapshot. Wouldn't it be more intuitive to introduce two CRs: FlinkSavepoint and FlinkCheckpoint Ideally they can internally share a lot of code paths, but from a users perspective, the abstraction is much clearer. 2. I also would like to discuss SavepointSpec.completed, as this name is not intuitive to me. How about "ignoreExisting"? 3. The FLIP proposal seems to leave error handling to the user, e.g. when you create a FlinkStateSnapshot, it will just move to status FAILED. Typically in K8s with the control loop stuff, resources are tried to get created until success. I think it would be really nice if the FlinkStateSnapshot or FlinkSavepoint resource would retry based on a property in the resource. A "FlinkStateSnapshot.retries" number would indicate how often the user wants the operator to retry creating a savepoint, "retries = -1" means retry forever. In addition, we could consider a timeout as well, however, I haven't seen such a concept in K8s CRs yet. The benefit of this is that other tools relying on the K8s operator wouldn't have to implement this retry loop (which is quite natural for K8s), they would just have to wait for the CR they've created to transition into COMPLETED: 3. FlinkStateSnapshotStatus.error will only show the last error. What about using Events, so that we can show multiple errors and use the FlinkStateSnapshotState to report errors? 4. I wonder if it makes sense to use something like Pod Conditions (e.g. Savepoint Conditions): https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions to track the completion status. We could have the following conditions: - Triggered - Completed - Failed The only benefit of this proposal that I see is that it would tell the user how long it took to create the savepoint. 5. You mention that "JobSpec.initialSavepointPath" will be deprecated. I assume we will introduce a new field for referencing a FlinkStateSnapshot CR? I think it would be good to cover this in the FLIP. One minor comment: "/** Dispose the savepoints upon CRD deletion. */" I think this should be "upon CR deletion", not "CRD deletion". Thanks again for this great FLIP! Best, Robert On Fri, Apr 19, 2024 at 9:01 AM Gyula Fóra wrote: > Cc'ing some folks who gave positive feedback on this idea in the past. > > I would love to hear your thoughts on the proposed design > > Gyula > > On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás > wrote: > >> +1 Looking forward to it >> >> On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany wrote: >> >> > Thank you Gyula! >> > >> > I think that is a great idea. I have updated the Google doc to only >> have 1 >> > new configuration option of boolean type, which can be used to signal >> the >> > Operator to use the old mode. >> > >> > Also described in the configuration description, the Operator will >> fallback >> > to the old mode if the FlinkStateSnapshot CRD cannot be found on the >> > Kubernetes cluster. >> > >> > Regards, >> > Mate >> > >> > Gyula Fóra ezt írta (időpont: 2024. ápr. 16., K, >> > 17:01): >> > >> > > Thanks Mate, this is great stuff. >> > > >> > > Mate, I think the new configs should probably default to the new mode >> and >> > > they should only be useful for users to fall back to the old >> behaviour. >> > > We could by default use the new Snapshot CRD if the CRD is installed, >> > > otherwise use the old mode by default and log a warning on startup. >> > > >> > > So I am suggesting a "dynamic" default behaviour based on whether the >> new >> > > CRD was installed or not because we don't want to break operator >> startup. >> > > >> > > Gyula >> > > >> > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany >> wrote: >> > > >> > > > Hi Ferenc, >> > > > >> > > > Thank you for your comments, I have updated the Google docs with a >> new >> > > > section for the new configs. >> > > > All of the newly added config keys will have defaults set, and by >> > default >> > > > all the savepoint/checkpoint operations will use the old system: >> write >> > > > their results to the FlinkDeployment/FlinkSessionJob status field. >> > > > >> > > > I have also added a default for the checkpoint type to be FULL >> (which >> > is >> > > > also the default currently). That was an oversight on my part to >> miss >> > > that. >> > > > >> > > > Regards, >> > > > Mate >> > > > >> > > > Ferenc Csaky ezt írta (időpont: 2024. >> > ápr. >> > > > 16., K, 16:10): >> > > > >> > > > > Thank you Mate for initiating this discussion. +1 for this idea. >> > > > > Some Qs: >> > > > > >> > > > > Can you specify the newly introduced configurations in more >> > > > > details? Currently, it is not fully clear to me what are the >> > > > > possible values of `kubernetes.operator.periodic.s
Re: [VOTE] FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines
+1(non-binding) Best, Yuepeng Pan At 2024-04-19 15:22:04, "gongzhongqiang" wrote: >+1(non-binding) > > >Best, > >Zhongqiang Gong > >Ron liu 于2024年4月17日周三 14:28写道: > >> Hi Dev, >> >> Thank you to everyone for the feedback on FLIP-435: Introduce a New >> Materialized Table for Simplifying Data Pipelines[1][2]. >> >> I'd like to start a vote for it. The vote will be open for at least 72 >> hours unless there is an objection or not enough votes. >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines >> [2] https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs >> >> Best, >> Ron >>
[jira] [Created] (FLINK-35171) Class 'FlinkSqlParserImpl' is missed
chenyu created FLINK-35171: -- Summary: Class 'FlinkSqlParserImpl' is missed Key: FLINK-35171 URL: https://issues.apache.org/jira/browse/FLINK-35171 Project: Flink Issue Type: Bug Affects Versions: 1.19.0 Reporter: chenyu Attachments: image-2024-04-19-15-55-54-035.png !image-2024-04-19-15-55-54-035.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines
+1(non-binding) Best, Zhongqiang Gong Ron liu 于2024年4月17日周三 14:28写道: > Hi Dev, > > Thank you to everyone for the feedback on FLIP-435: Introduce a New > Materialized Table for Simplifying Data Pipelines[1][2]. > > I'd like to start a vote for it. The vote will be open for at least 72 > hours unless there is an objection or not enough votes. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines > [2] https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs > > Best, > Ron >
[jira] [Created] (FLINK-35170) SqlServer connector support scanNewlyAddedTableEnabled param
fengfengzai created FLINK-35170: --- Summary: SqlServer connector support scanNewlyAddedTableEnabled param Key: FLINK-35170 URL: https://issues.apache.org/jira/browse/FLINK-35170 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: 2.0.0 Reporter: fengfengzai SqlServer connector support scanNewlyAddedTableEnabled param -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD
Cc'ing some folks who gave positive feedback on this idea in the past. I would love to hear your thoughts on the proposed design Gyula On Tue, Apr 16, 2024 at 6:31 PM Őrhidi Mátyás wrote: > +1 Looking forward to it > > On Tue, Apr 16, 2024 at 8:56 AM Mate Czagany wrote: > > > Thank you Gyula! > > > > I think that is a great idea. I have updated the Google doc to only have > 1 > > new configuration option of boolean type, which can be used to signal the > > Operator to use the old mode. > > > > Also described in the configuration description, the Operator will > fallback > > to the old mode if the FlinkStateSnapshot CRD cannot be found on the > > Kubernetes cluster. > > > > Regards, > > Mate > > > > Gyula Fóra ezt írta (időpont: 2024. ápr. 16., K, > > 17:01): > > > > > Thanks Mate, this is great stuff. > > > > > > Mate, I think the new configs should probably default to the new mode > and > > > they should only be useful for users to fall back to the old behaviour. > > > We could by default use the new Snapshot CRD if the CRD is installed, > > > otherwise use the old mode by default and log a warning on startup. > > > > > > So I am suggesting a "dynamic" default behaviour based on whether the > new > > > CRD was installed or not because we don't want to break operator > startup. > > > > > > Gyula > > > > > > On Tue, Apr 16, 2024 at 4:48 PM Mate Czagany > wrote: > > > > > > > Hi Ferenc, > > > > > > > > Thank you for your comments, I have updated the Google docs with a > new > > > > section for the new configs. > > > > All of the newly added config keys will have defaults set, and by > > default > > > > all the savepoint/checkpoint operations will use the old system: > write > > > > their results to the FlinkDeployment/FlinkSessionJob status field. > > > > > > > > I have also added a default for the checkpoint type to be FULL (which > > is > > > > also the default currently). That was an oversight on my part to miss > > > that. > > > > > > > > Regards, > > > > Mate > > > > > > > > Ferenc Csaky ezt írta (időpont: 2024. > > ápr. > > > > 16., K, 16:10): > > > > > > > > > Thank you Mate for initiating this discussion. +1 for this idea. > > > > > Some Qs: > > > > > > > > > > Can you specify the newly introduced configurations in more > > > > > details? Currently, it is not fully clear to me what are the > > > > > possible values of `kubernetes.operator.periodic.savepoint.mode`, > > > > > is it optional, has a default value? > > > > > > > > > > I see that in `SavepointSpec.formatType` has a default, although > > > > > `CheckppointSpec.checkpointType` not. Are we inferring that from > > > > > the config? My point is, in general I think it would be good to > > > > > handle the two snapshot types in a similar way when it makes sense > > > > > to minimize any kind of confusion. > > > > > > > > > > Best, > > > > > Ferenc > > > > > > > > > > > > > > > > > > > > On Tuesday, April 16th, 2024 at 11:34, Mate Czagany < > > > czmat...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Everyone, > > > > > > > > > > > > I would like to start a discussion on FLIP-446: Kubernetes > Operator > > > > State > > > > > > Snapshot CRD. > > > > > > > > > > > > This FLIP adds a new custom resource for Operator users to create > > and > > > > > > manage their savepoints and checkpoints. I have also developed an > > > > initial > > > > > > POC to prove that this approach is feasible, you can find the > link > > > for > > > > > that > > > > > > in the FLIP. > > > > > > > > > > > > There is a Confluence page [1] and a Google Docs page [2] as I do > > not > > > > > have > > > > > > a Confluence account yet. > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1VdfLFaE4i6ESbCQ38CH7TKOiPQVvXeOxNV2FeSMnOTg > > > > > > > > > > > > > > > > > > Regards, > > > > > > Mate > > > > > > > > > > > > > > >