[jira] [Created] (FLINK-34347) Kubernetes native resource manager request wrong spec.

2024-02-03 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-34347:
---

 Summary: Kubernetes native resource manager request wrong spec.
 Key: FLINK-34347
 URL: https://issues.apache.org/jira/browse/FLINK-34347
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes, Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.1, 1.18.0
Reporter: Ruibin Xing
 Attachments: jobmanager.csv, 
taskmanager_octopus-16-323-octopus-engine-write-proxy-taskmanager-3-326.csv

We had a flink spec in which TM cpu is set to 0.5, then we upgraded it to 4.0. 
We found the job manager requesting both TM with 0.5 CPU and 4 CPU. Most TMs 
with 0.5 CPU was released soon, however there was 1 TM with 0.5 CPU remained 
and caused lag in job.

 

Logs for mixed TM requests:
{code:java}
2024-02-03 10:10:41,414 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker octopus-16-323-octopus-engine-write-proxy-taskmanager-3-244 
with resource spec WorkerResourceSpec {cpuCores=4.0, taskHeapSize=5.637gb 
(6053219520 bytes), taskOffHeapSize=1024.000mb (1073741824 bytes), 
networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, 
numSlots=4}.02-03 18:10:44.8442024-02-03 10:10:44,844 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.5, 
taskHeapSize=1.137gb (1221381320 bytes), taskOffHeapSize=1024.000mb (1073741824 
bytes), networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, 
numSlots=4}, current pending count: 1.02-03 18:10:44.9202024-02-03 10:10:44,920 
INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] 
- Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.5, 
taskHeapSize=1.137gb (1221381320 bytes), taskOffHeapSize=1024.000mb (1073741824 
bytes), networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, 
numSlots=4}, current pending count: 2.02-03 18:10:44.942 {code}
The name of wrong TM: 
octopus-16-323-octopus-engine-write-proxy-taskmanager-3-326.

Relevant logs are attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33863) Compressed Operator state restore failed

2023-12-15 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-33863:
---

 Summary: Compressed Operator state restore failed
 Key: FLINK-33863
 URL: https://issues.apache.org/jira/browse/FLINK-33863
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.18.0
Reporter: Ruibin Xing


We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
Compression and used multiple Operator States in an operator. When recovering 
Operator State from a Savepoint, the following error occurred: 
"org.xerial.snappy.SnappyFramedInputStream: encountered EOF while reading 
stream header."

After researching, I believe the error is due to Flink 1.18.0's support for 
Snapshot Compression on Operator State (see 
https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a Savepoint, 
SnappyFramedInputStream adds a header to the beginning of the data. When 
recovering Operator State from a Savepoint, SnappyFramedInputStream verifies 
the header from the beginning of the data.

Currently, when recovering Operator State with Snapshot Compression enabled, 
the logic is as follows:
For each OperatorStateHandle:
1. Verify if the current Savepoint stream's offset is the Snappy header.
2. Seek to the state's start offset.
3. Read the state's data and finally seek to the state's end offset.
(See: 
https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172
 )

Furthermore, when there are multiple Operator States, they are not sorted 
according to the Operator State's offset. Therefore, if the Operator States are 
out of order and the final offset is recovered first, the Savepoint stream will 
be seeked to the end, resulting in an EOF error.

I propose a solution: sort the OperatorStateHandle by offset and then recover 
the Operator State in order. After testing, this approach resolves the issue.

I will submit a PR. This is my first time contributing code, so any help is 
really appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33011) Operator deletes HA data unexpectedly

2023-08-31 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-33011:
---

 Summary: Operator deletes HA data unexpectedly
 Key: FLINK-33011
 URL: https://issues.apache.org/jira/browse/FLINK-33011
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0, 1.17.1
 Environment: Flink: 1.17.1

Flink Kubernetes Operator: 1.6.0
Reporter: Ruibin Xing
 Attachments: flink_operator_logs_0831.csv

We encountered a problem where the operator unexpectedly deleted HA data.

The timeline is as follows:

12:08 We submitted the first spec, which suspended the job with savepoint 
upgrade mode.

12:08 The job was suspended, while the HA data was preserved, and the log 
showed the observed job deployment status was MISSING.

12:10 We submitted the second spec, which deployed the job with the last state 
upgrade mode.

12:10 Logs showed the operator deleted both the Flink deployment and the HA 
data again.

12:10 The job failed to start because the HA data was missing.

According to the log, the deletion was triggered by 
https://github.com/apache/flink-kubernetes-operator/blob/a728ba768e20236184e2b9e9e45163304b8b196c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L168

I think this would only be triggered if the job deployment status wasn't 
MISSING. But the log before the deletion showed the observed job status was 
MISSING at that moment.

Related logs:

 
{code:java}
2023-08-30 12:08:48.190 + o.a.f.k.o.s.AbstractFlinkService [INFO 
][default/pipeline-pipeline-se-3] Cluster shutdown completed.
2023-08-30 12:10:27.010 + o.a.f.k.o.o.d.ApplicationObserver [INFO 
][default/pipeline-pipeline-se-3] Observing JobManager deployment. Previous 
status: MISSING
2023-08-30 12:10:27.533 + o.a.f.k.o.l.AuditUtils         [INFO 
][default/pipeline-pipeline-se-3] >>> Event  | Info    | SPECCHANGED     | 
UPGRADE change(s) detected (Diff: FlinkDeploymentSpec[image : 
docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:0835137c-362 
-> 
docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:23db7ae8-365,
 podTemplate.metadata.labels.app.kubernetes.io~1version : 
0835137cd803b7258695eb53a6ec520cb62a48a7 -> 
23db7ae84bdab8d91fa527fe2f8f2fce292d0abc, job.state : suspended -> running, 
job.upgradeMode : last-state -> savepoint, restartNonce : 1545 -> 1547]), 
starting reconciliation.
2023-08-30 12:10:27.679 + o.a.f.k.o.s.NativeFlinkService [INFO 
][default/pipeline-pipeline-se-3] Deleting JobManager deployment and HA 
metadata.
{code}
A more complete log file is attached. Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32520) FlinkDeployment recovered states from an obsolete savepoint

2023-07-03 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-32520:
---

 Summary: FlinkDeployment recovered states from an obsolete 
savepoint
 Key: FLINK-32520
 URL: https://issues.apache.org/jira/browse/FLINK-32520
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Affects Versions: 1.13.1
Reporter: Ruibin Xing
 Attachments: flink_kubernetes_operator_0615.csv

Kubernetes Operator version: 1.5.0
 

When upgrading one of our Flink jobs, it recovered from a savepoint created by 
the previous version of the job. The timeline of the job is as follows:
 # I upgraded the job for the first time. The job created a savepoint and 
successfully restored from it.
 # The job was running fine and created several checkpoints.
 # Later, I performed the second upgrade. Soon after submission and before the 
JobManager stopped, I realized I made a mistake in the spec, so I quickly did 
the third upgrade.
 # After the job started, I found that it had recovered from the savepoint 
created during the first upgrade.

 

It appears that there was an error when submitting the third upgrade. However, 
I'm still not quite sure why this would cause Flink to use the obsolete 
savepoint after investigating the code. The related logs for the operator are 
attached below.
 

Although I haven't found the root cause, I came up with some possible fixes:
 # Remove the {{lastSavepoint}} after a job has successfully restored from it.

 # Add options for savepoint, similar to: 
{{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The 
operator should refuse to recover from the savepoint if the max age is exceeded.

 # Create a flag in the status that records savepoint states. Set the flag to 
false when the savepoint starts and mark it as true when it successfully ends. 
The job should report an error if the flag for the last savepoint is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-16 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-31483:
---

 Summary: Implement Split Deletion Support in Flink Kafka Connector
 Key: FLINK-31483
 URL: https://issues.apache.org/jira/browse/FLINK-31483
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka, Connectors / Parent
Reporter: Ruibin Xing


Currently, the Flink Kafka Connector does not support split deletion and is 
left as a 
[TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
 I want to add this feature by doing these steps:

1. Add SplitsDeletion event to flink-connector-base, which currently only has 
SplitsAddition.
2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
SplitsDeletion event to the source operator. To maintain compatibility, a 
default empty implementation for this method will be added.
3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
SourceReader to delete splits.
4. Create a deleteSplits method in SourceReader to remove splits, including 
remove them from Split state and stopping SourceReader from reading the deleted 
splits.

As an alternative, without modifying the flink-connector-base, 
KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
splits deletion and deal with it in the kafka-connector-specific code. But I 
think it's better to have SplitsDeletion in flink-connector-base, so other 
connectors can use it too.

Let me know if you have any thoughts or ideas. Thanks!

Related Issues: [FLINK-30490|https://issues.apache.org/jira/browse/FLINK-30490]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)