[jira] [Created] (FLINK-34347) Kubernetes native resource manager request wrong spec.
Ruibin Xing created FLINK-34347: --- Summary: Kubernetes native resource manager request wrong spec. Key: FLINK-34347 URL: https://issues.apache.org/jira/browse/FLINK-34347 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes, Kubernetes Operator Affects Versions: kubernetes-operator-1.6.1, 1.18.0 Reporter: Ruibin Xing Attachments: jobmanager.csv, taskmanager_octopus-16-323-octopus-engine-write-proxy-taskmanager-3-326.csv We had a flink spec in which TM cpu is set to 0.5, then we upgraded it to 4.0. We found the job manager requesting both TM with 0.5 CPU and 4 CPU. Most TMs with 0.5 CPU was released soon, however there was 1 TM with 0.5 CPU remained and caused lag in job. Logs for mixed TM requests: {code:java} 2024-02-03 10:10:41,414 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requested worker octopus-16-323-octopus-engine-write-proxy-taskmanager-3-244 with resource spec WorkerResourceSpec {cpuCores=4.0, taskHeapSize=5.637gb (6053219520 bytes), taskOffHeapSize=1024.000mb (1073741824 bytes), networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, numSlots=4}.02-03 18:10:44.8442024-02-03 10:10:44,844 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.5, taskHeapSize=1.137gb (1221381320 bytes), taskOffHeapSize=1024.000mb (1073741824 bytes), networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, numSlots=4}, current pending count: 1.02-03 18:10:44.9202024-02-03 10:10:44,920 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.5, taskHeapSize=1.137gb (1221381320 bytes), taskOffHeapSize=1024.000mb (1073741824 bytes), networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, numSlots=4}, current pending count: 2.02-03 18:10:44.942 {code} The name of wrong TM: octopus-16-323-octopus-engine-write-proxy-taskmanager-3-326. Relevant logs are attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33863) Compressed Operator state restore failed
Ruibin Xing created FLINK-33863: --- Summary: Compressed Operator state restore failed Key: FLINK-33863 URL: https://issues.apache.org/jira/browse/FLINK-33863 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.18.0 Reporter: Ruibin Xing We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot Compression and used multiple Operator States in an operator. When recovering Operator State from a Savepoint, the following error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF while reading stream header." After researching, I believe the error is due to Flink 1.18.0's support for Snapshot Compression on Operator State (see https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a Savepoint, SnappyFramedInputStream adds a header to the beginning of the data. When recovering Operator State from a Savepoint, SnappyFramedInputStream verifies the header from the beginning of the data. Currently, when recovering Operator State with Snapshot Compression enabled, the logic is as follows: For each OperatorStateHandle: 1. Verify if the current Savepoint stream's offset is the Snappy header. 2. Seek to the state's start offset. 3. Read the state's data and finally seek to the state's end offset. (See: https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172 ) Furthermore, when there are multiple Operator States, they are not sorted according to the Operator State's offset. Therefore, if the Operator States are out of order and the final offset is recovered first, the Savepoint stream will be seeked to the end, resulting in an EOF error. I propose a solution: sort the OperatorStateHandle by offset and then recover the Operator State in order. After testing, this approach resolves the issue. I will submit a PR. This is my first time contributing code, so any help is really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33011) Operator deletes HA data unexpectedly
Ruibin Xing created FLINK-33011: --- Summary: Operator deletes HA data unexpectedly Key: FLINK-33011 URL: https://issues.apache.org/jira/browse/FLINK-33011 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0, 1.17.1 Environment: Flink: 1.17.1 Flink Kubernetes Operator: 1.6.0 Reporter: Ruibin Xing Attachments: flink_operator_logs_0831.csv We encountered a problem where the operator unexpectedly deleted HA data. The timeline is as follows: 12:08 We submitted the first spec, which suspended the job with savepoint upgrade mode. 12:08 The job was suspended, while the HA data was preserved, and the log showed the observed job deployment status was MISSING. 12:10 We submitted the second spec, which deployed the job with the last state upgrade mode. 12:10 Logs showed the operator deleted both the Flink deployment and the HA data again. 12:10 The job failed to start because the HA data was missing. According to the log, the deletion was triggered by https://github.com/apache/flink-kubernetes-operator/blob/a728ba768e20236184e2b9e9e45163304b8b196c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L168 I think this would only be triggered if the job deployment status wasn't MISSING. But the log before the deletion showed the observed job status was MISSING at that moment. Related logs: {code:java} 2023-08-30 12:08:48.190 + o.a.f.k.o.s.AbstractFlinkService [INFO ][default/pipeline-pipeline-se-3] Cluster shutdown completed. 2023-08-30 12:10:27.010 + o.a.f.k.o.o.d.ApplicationObserver [INFO ][default/pipeline-pipeline-se-3] Observing JobManager deployment. Previous status: MISSING 2023-08-30 12:10:27.533 + o.a.f.k.o.l.AuditUtils [INFO ][default/pipeline-pipeline-se-3] >>> Event | Info | SPECCHANGED | UPGRADE change(s) detected (Diff: FlinkDeploymentSpec[image : docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:0835137c-362 -> docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:23db7ae8-365, podTemplate.metadata.labels.app.kubernetes.io~1version : 0835137cd803b7258695eb53a6ec520cb62a48a7 -> 23db7ae84bdab8d91fa527fe2f8f2fce292d0abc, job.state : suspended -> running, job.upgradeMode : last-state -> savepoint, restartNonce : 1545 -> 1547]), starting reconciliation. 2023-08-30 12:10:27.679 + o.a.f.k.o.s.NativeFlinkService [INFO ][default/pipeline-pipeline-se-3] Deleting JobManager deployment and HA metadata. {code} A more complete log file is attached. Thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32520) FlinkDeployment recovered states from an obsolete savepoint
Ruibin Xing created FLINK-32520: --- Summary: FlinkDeployment recovered states from an obsolete savepoint Key: FLINK-32520 URL: https://issues.apache.org/jira/browse/FLINK-32520 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Affects Versions: 1.13.1 Reporter: Ruibin Xing Attachments: flink_kubernetes_operator_0615.csv Kubernetes Operator version: 1.5.0 When upgrading one of our Flink jobs, it recovered from a savepoint created by the previous version of the job. The timeline of the job is as follows: # I upgraded the job for the first time. The job created a savepoint and successfully restored from it. # The job was running fine and created several checkpoints. # Later, I performed the second upgrade. Soon after submission and before the JobManager stopped, I realized I made a mistake in the spec, so I quickly did the third upgrade. # After the job started, I found that it had recovered from the savepoint created during the first upgrade. It appears that there was an error when submitting the third upgrade. However, I'm still not quite sure why this would cause Flink to use the obsolete savepoint after investigating the code. The related logs for the operator are attached below. Although I haven't found the root cause, I came up with some possible fixes: # Remove the {{lastSavepoint}} after a job has successfully restored from it. # Add options for savepoint, similar to: {{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The operator should refuse to recover from the savepoint if the max age is exceeded. # Create a flag in the status that records savepoint states. Set the flag to false when the savepoint starts and mark it as true when it successfully ends. The job should report an error if the flag for the last savepoint is false. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector
Ruibin Xing created FLINK-31483: --- Summary: Implement Split Deletion Support in Flink Kafka Connector Key: FLINK-31483 URL: https://issues.apache.org/jira/browse/FLINK-31483 Project: Flink Issue Type: New Feature Components: Connectors / Kafka, Connectors / Parent Reporter: Ruibin Xing Currently, the Flink Kafka Connector does not support split deletion and is left as a [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]]. I want to add this feature by doing these steps: 1. Add SplitsDeletion event to flink-connector-base, which currently only has SplitsAddition. 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a SplitsDeletion event to the source operator. To maintain compatibility, a default empty implementation for this method will be added. 3. Make SourceOperator handle the SplitsDeletion event, notifiying the SourceReader to delete splits. 4. Create a deleteSplits method in SourceReader to remove splits, including remove them from Split state and stopping SourceReader from reading the deleted splits. As an alternative, without modifying the flink-connector-base, KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for splits deletion and deal with it in the kafka-connector-specific code. But I think it's better to have SplitsDeletion in flink-connector-base, so other connectors can use it too. Let me know if you have any thoughts or ideas. Thanks! Related Issues: [FLINK-30490|https://issues.apache.org/jira/browse/FLINK-30490] -- This message was sent by Atlassian Jira (v8.20.10#820010)