[jira] [Created] (FLINK-35803) ResumeCheckpointManuallyITCase fails with checkpoint file merging
Rui Fan created FLINK-35803: --- Summary: ResumeCheckpointManuallyITCase fails with checkpoint file merging Key: FLINK-35803 URL: https://issues.apache.org/jira/browse/FLINK-35803 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 2.0.0 Reporter: Rui Fan [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60807=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=8905] Test: ResumeCheckpointManuallyITCase.testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper FileNotFoundException happens when restoring from checkpoint that enabled file merging. I'm not sure the file is deleted unpectedly by flink code or CI is unstable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35761) Speed up the restore process of unaligned checkpoint
Rui Fan created FLINK-35761: --- Summary: Speed up the restore process of unaligned checkpoint Key: FLINK-35761 URL: https://issues.apache.org/jira/browse/FLINK-35761 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.19.1, 1.20.0 Reporter: Rui Fan Assignee: Rui Fan Currently, the task will transition state from ExecutionState.INITIALIZING to ExecutionState.RUNNING after all input buffers are processed. It will cause the restore time is very long if the performance is not strong and unaligned checkpoint snapshotted too many input buffers. From my experience, the restore time will excess 30 minutes when job with high parallelism. We hope the job is switched to RUNNING asap. Because the new checkpoint is unable to be triggered during INITIALIZING. If the job is switched to RUNNING, the new unaligned checkpoint can be made. h2. Brief Solution: # The task is switched to RUNNING after all input buffers are added to RecoveredInputChannel. ** In general, it's quick unless the network buffer isn't enough. # RecoveredInputChannel supports snapshot for network buffers -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35738) Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order
Rui Fan created FLINK-35738: --- Summary: Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order Key: FLINK-35738 URL: https://issues.apache.org/jira/browse/FLINK-35738 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Rui Fan The problem occurs when using RocksDB and specific queries/jobs (please see the ticket for the detailed description). To test the solution, run the following query with RocksDB as a state backend: {code:java} INSERT INTO top_5_highest_view_time SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY view_time DESC) AS rownum FROM ( SELECT window_start, window_end, product_id, SUM(view_time) AS view_time, COUNT(*) AS cnt FROM TABLE(TUMBLE(TABLE `shoe_clickstream`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, product_id)) WHERE rownum <= 5;{code} With the feature disabled (default), the number of files in rocksdb working directory (as well as in the checkpoint) should grow indefinitely. With feature enabled, the number of files should stays constant (as they should get merged with each other). To enable the feature, set {code:java} state.backend.rocksdb.manual-compaction.min-interval{code} set to 1 minute for example. Please consult [https://github.com/apache/flink/blob/e7d7db3b6f87e53d9bace2a16cf95e5f7a79087a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java#L29] for other options if necessary. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35665) Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
Rui Fan created FLINK-35665: --- Summary: Release Testing: FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI Key: FLINK-35665 URL: https://issues.apache.org/jira/browse/FLINK-35665 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Fix For: 1.20.0 Attachments: image-2024-06-21-15-51-53-480.png Test suggestion: 1. Using this following job to check the jobType {code:java} import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** Test for showing job type in Flink WebUI. */ public class JobTypeDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // env.setRuntimeMode(RuntimeExecutionMode.BATCH); // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataGeneratorSource generatorSource = new DataGeneratorSource<>( value -> value, 600, RateLimiterStrategy.perSecond(10), Types.LONG); env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator") .map((MapFunction) value -> value) .name("Map___1") .print(); env.execute(JobTypeDemo.class.getSimpleName()); } } {code} 2. Start it and check if the jobType is Streaming in Flink web UI. !image-2024-06-21-15-49-40-729.png|width=1581,height=662! 3. Applying the env.setRuntimeMode(RuntimeExecutionMode.BATCH);, and check if the jobType is Batch in Flink web UI. 4. Applying the env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);, and check if the jobType is Batch in Flink web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35629) Performance regression in stringRead and stringWrite
Rui Fan created FLINK-35629: --- Summary: Performance regression in stringRead and stringWrite Key: FLINK-35629 URL: https://issues.apache.org/jira/browse/FLINK-35629 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.20.0 Reporter: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35614) Release Testing Instructions: Verify FLIP-443: Interruptible timers firing
Rui Fan created FLINK-35614: --- Summary: Release Testing Instructions: Verify FLIP-443: Interruptible timers firing Key: FLINK-35614 URL: https://issues.apache.org/jira/browse/FLINK-35614 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35613) Release Testing Instructions: Verify [FLIP-451] Introduce timeout configuration to AsyncSink
Rui Fan created FLINK-35613: --- Summary: Release Testing Instructions: Verify [FLIP-451] Introduce timeout configuration to AsyncSink Key: FLINK-35613 URL: https://issues.apache.org/jira/browse/FLINK-35613 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35612) Release Testing Instructions: Verify FLIP-445: Support dynamic parallelism inference for HiveSource
Rui Fan created FLINK-35612: --- Summary: Release Testing Instructions: Verify FLIP-445: Support dynamic parallelism inference for HiveSource Key: FLINK-35612 URL: https://issues.apache.org/jira/browse/FLINK-35612 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35611) Release Testing Instructions: Verify [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction
Rui Fan created FLINK-35611: --- Summary: Release Testing Instructions: Verify [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction Key: FLINK-35611 URL: https://issues.apache.org/jira/browse/FLINK-35611 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35610) Release Testing Instructions: Verify FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table
Rui Fan created FLINK-35610: --- Summary: Release Testing Instructions: Verify FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table Key: FLINK-35610 URL: https://issues.apache.org/jira/browse/FLINK-35610 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35609) Release Testing Instructions: Verify FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines
Rui Fan created FLINK-35609: --- Summary: Release Testing Instructions: Verify FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines Key: FLINK-35609 URL: https://issues.apache.org/jira/browse/FLINK-35609 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35608) CLONE - Release Testing Instructions: Verify FLIP-376: Add DISTRIBUTED BY clause
Rui Fan created FLINK-35608: --- Summary: CLONE - Release Testing Instructions: Verify FLIP-376: Add DISTRIBUTED BY clause Key: FLINK-35608 URL: https://issues.apache.org/jira/browse/FLINK-35608 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35607) Release Testing Instructions: Verify FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI
Rui Fan created FLINK-35607: --- Summary: Release Testing Instructions: Verify FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI Key: FLINK-35607 URL: https://issues.apache.org/jira/browse/FLINK-35607 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-29481 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35606) Release Testing Instructions: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order
Rui Fan created FLINK-35606: --- Summary: Release Testing Instructions: Verify FLINK-26050 Too many small sst files in rocksdb state backend when using time window created in ascending order Key: FLINK-35606 URL: https://issues.apache.org/jira/browse/FLINK-35606 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Rui Fan Assignee: Roman Khachatryan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-26050 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35605) Release Testing Instructions: Verify FLIP-306 Unified File Merging Mechanism for Checkpoints
Rui Fan created FLINK-35605: --- Summary: Release Testing Instructions: Verify FLIP-306 Unified File Merging Mechanism for Checkpoints Key: FLINK-35605 URL: https://issues.apache.org/jira/browse/FLINK-35605 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Rui Fan Assignee: Zakelly Lan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-32070 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35604) Release Testing Instructions: Verify FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs
Rui Fan created FLINK-35604: --- Summary: Release Testing Instructions: Verify FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs Key: FLINK-35604 URL: https://issues.apache.org/jira/browse/FLINK-35604 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Rui Fan Assignee: Yuxin Tan Fix For: 1.20.0 Follow up the test for https://issues.apache.org/jira/browse/FLINK-35533 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35603) Release Testing Instructions: Verify FLINK-35533(FLIP-459): Support Flink hybrid shuffle integration with Apache Celeborn
Rui Fan created FLINK-35603: --- Summary: Release Testing Instructions: Verify FLINK-35533(FLIP-459): Support Flink hybrid shuffle integration with Apache Celeborn Key: FLINK-35603 URL: https://issues.apache.org/jira/browse/FLINK-35603 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Rui Fan Assignee: Yuxin Tan Follow up the test for https://issues.apache.org/jira/browse/FLINK-35533 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35471) Kubernetes operator bumps the flink version to 1.19
Rui Fan created FLINK-35471: --- Summary: Kubernetes operator bumps the flink version to 1.19 Key: FLINK-35471 URL: https://issues.apache.org/jira/browse/FLINK-35471 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Kubernetes operator bumps the flink version to 1.19 after 1.19.1 is released. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35315) MemoryManagerConcurrentModReleaseTest executes more than 15 minutes
Rui Fan created FLINK-35315: --- Summary: MemoryManagerConcurrentModReleaseTest executes more than 15 minutes Key: FLINK-35315 URL: https://issues.apache.org/jira/browse/FLINK-35315 Project: Flink Issue Type: Bug Components: Runtime / Network, Tests Affects Versions: 1.20.0 Reporter: Rui Fan Attachments: image-2024-05-09-11-53-10-037.png [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395=results] It seems MemoryManagerConcurrentModReleaseTest.testConcurrentModificationWhileReleasing executes more than 15 minutes. The root cause may be {color:#e1dfdd}ConcurrentModificationException{color} [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59395=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=10060] !image-2024-05-09-11-53-10-037.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35306) Flink cannot compile with jdk17
Rui Fan created FLINK-35306: --- Summary: Flink cannot compile with jdk17 Key: FLINK-35306 URL: https://issues.apache.org/jira/browse/FLINK-35306 Project: Flink Issue Type: Bug Affects Versions: 1.20.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Attachments: image-2024-05-08-11-48-04-161.png !image-2024-05-08-11-48-04-161.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35227) Remove execution-mode in ExecutionConfigInfo
Rui Fan created FLINK-35227: --- Summary: Remove execution-mode in ExecutionConfigInfo Key: FLINK-35227 URL: https://issues.apache.org/jira/browse/FLINK-35227 Project: Flink Issue Type: Sub-task Components: Runtime / REST Reporter: Rui Fan Assignee: Rui Fan Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35226) Deprecate execution-mode in ExecutionConfigInfo related rest api
Rui Fan created FLINK-35226: --- Summary: Deprecate execution-mode in ExecutionConfigInfo related rest api Key: FLINK-35226 URL: https://issues.apache.org/jira/browse/FLINK-35226 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35225) Remove Execution mode in Flink WebUI
Rui Fan created FLINK-35225: --- Summary: Remove Execution mode in Flink WebUI Key: FLINK-35225 URL: https://issues.apache.org/jira/browse/FLINK-35225 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35223) Add jobType in JobDetailsInfo related rest api
Rui Fan created FLINK-35223: --- Summary: Add jobType in JobDetailsInfo related rest api Key: FLINK-35223 URL: https://issues.apache.org/jira/browse/FLINK-35223 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35224) Show the JobType on Flink WebUI
Rui Fan created FLINK-35224: --- Summary: Show the JobType on Flink WebUI Key: FLINK-35224 URL: https://issues.apache.org/jira/browse/FLINK-35224 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35222) Adding getJobType for AccessExecutionGraph
Rui Fan created FLINK-35222: --- Summary: Adding getJobType for AccessExecutionGraph Key: FLINK-35222 URL: https://issues.apache.org/jira/browse/FLINK-35222 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Adding getJobType for AccessExecutionGraph interface, and all implementations need to overrite it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed
Rui Fan created FLINK-35215: --- Summary: The performance of serializerKryo and serializerKryoWithoutRegistration are regressed Key: FLINK-35215 URL: https://issues.apache.org/jira/browse/FLINK-35215 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.20.0 Reporter: Rui Fan The performance of serializerKryo and serializerKryoWithoutRegistration are regressed[1][2], I checked recent commits, and found FLINK-34954 changed related logic. [1] [http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryo=on=on=off=3=50] [2] http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryoWithoutRegistration=on=on=off=3=50 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35105) Support setting default Autoscaler options at autoscaler standalone level
Rui Fan created FLINK-35105: --- Summary: Support setting default Autoscaler options at autoscaler standalone level Key: FLINK-35105 URL: https://issues.apache.org/jira/browse/FLINK-35105 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Currently, autoscaler standalone doesn't support set [autoscaler options|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/#autoscaler-configuration]. We must set them at job level when we use autoscaler standalone. It's not convenient if platform administrator wanna change the default value for some autoscaler options, such as: * job.autoscaler.enabled * job.autoscaler.metrics.window * etc This Jira supports setting Autoscaler options at autoscaler standalone level, it's similar with flink kubernetes operator. The autoscaler options of autoscaler standalone will be as the base configuration, and the configuration at job-level can override the default value provided in the autoscaler standalone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35040) The performance of serializerHeavyString regresses since April 3
Rui Fan created FLINK-35040: --- Summary: The performance of serializerHeavyString regresses since April 3 Key: FLINK-35040 URL: https://issues.apache.org/jira/browse/FLINK-35040 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.20.0 Reporter: Rui Fan Attachments: image-2024-04-08-10-51-07-403.png The performance of serializerHeavyString regresses since April 3, and had not yet recovered on April 8th. http://flink-speed.xyz/timeline/#/?exe=6=serializerHeavyString=on=on=off=3=200 !image-2024-04-08-10-51-07-403.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34957) JDBC Autoscaler event handler throws Column 'message' cannot be null
Rui Fan created FLINK-34957: --- Summary: JDBC Autoscaler event handler throws Column 'message' cannot be null Key: FLINK-34957 URL: https://issues.apache.org/jira/browse/FLINK-34957 Project: Flink Issue Type: Bug Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Attachments: image-2024-03-28-11-57-35-234.png JDBC Autoscaler event handler doesn't allow the event message is null, but the message may be null when we handle the exception. We consider the exception message as the event message, but the exception message may be null, such as: TimeoutException. (It has been shown in following picture.) Also, ecording a event without any message is meaningless. It doesn't have any benefit for troubleshooting. Solution: * Consider the exception message as the event message when exception message isn't null * The whole Exception as the event message if exception message is null. !image-2024-03-28-11-57-35-234.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34956) The config type is wrong for Duration
Rui Fan created FLINK-34956: --- Summary: The config type is wrong for Duration Key: FLINK-34956 URL: https://issues.apache.org/jira/browse/FLINK-34956 Project: Flink Issue Type: Bug Components: Documentation Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 Attachments: image-2024-03-28-11-21-31-802.png The Config type is Boolean, but it should be Duration. https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/ !image-2024-03-28-11-21-31-802.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34907) jobRunningTs should be the timestamp that all tasks are running
Rui Fan created FLINK-34907: --- Summary: jobRunningTs should be the timestamp that all tasks are running Key: FLINK-34907 URL: https://issues.apache.org/jira/browse/FLINK-34907 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Currently, we consider the timestamp that JobStatus is changed to RUNNING as jobRunningTs. But the JobStatus will be RUNNING once job starts schedule, so it doesn't mean all tasks are running. It will let the isStabilizing or estimating restart time are not accurate. Solution: jobRunningTs should be the timestamp that all tasks are running. It can be got from SubtasksTimesHeaders rest api. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34906) Don't start autoscaling when some tasks are not running
Rui Fan created FLINK-34906: --- Summary: Don't start autoscaling when some tasks are not running Key: FLINK-34906 URL: https://issues.apache.org/jira/browse/FLINK-34906 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.9.0 Attachments: image-2024-03-21-17-40-23-523.png Currently, the autoscaler will scale a job when the JobStatus is RUNNING. But the JobStatus will be RUNNING once job starts schedule, so it doesn't mean all tasks are running. Especially, when the resource isn't enough or job recovers from large state. The autoscaler will throw exception and generate the AutoscalerError event when tasks are not ready, such as: !image-2024-03-21-17-40-23-523.png! Solution: we only scale job that all tasks are running(some of tasks may be finished). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34744) autoscaling-dynamic cannot run
Rui Fan created FLINK-34744: --- Summary: autoscaling-dynamic cannot run Key: FLINK-34744 URL: https://issues.apache.org/jira/browse/FLINK-34744 Project: Flink Issue Type: Bug Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Attachments: image-2024-03-19-21-46-15-530.png autoscaling-dynamic cannot run on my Mac !image-2024-03-19-21-46-15-530.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34743) Memory tuning takes effect even if the parallelism isn't changed
Rui Fan created FLINK-34743: --- Summary: Memory tuning takes effect even if the parallelism isn't changed Key: FLINK-34743 URL: https://issues.apache.org/jira/browse/FLINK-34743 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.9.0 Currently, the memory tuning related logic is only called when the parallelism is changed. See ScalingExecutor#scaleResource to get more details. It's better to let the memory tuning takes effect even if the parallelism isn't changed. For example, one flink job runs with desired parallelisms, but it wastes memory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34655) Autoscaler doesn't work for flink 1.15
Rui Fan created FLINK-34655: --- Summary: Autoscaler doesn't work for flink 1.15 Key: FLINK-34655 URL: https://issues.apache.org/jira/browse/FLINK-34655 Project: Flink Issue Type: Bug Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.8.0 flink-ubernetes-operator is committed to supporting the latest 4 flink minor versions, and autoscaler is a part of flink-ubernetes-operator. Currently, the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18. But autoscaler doesn't work for flink 1.15. h2. Root cause: * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16 * IOMetricsInfo is a part of JobDetailsInfo * JobDetailsInfo is necessary for autoscaler [1] * flink's RestClient doesn't allow miss any property during deserializing the json That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 1.15 jobs. h2. How to fix it properly? Flink side support ignore unknown properties. FLINK-33268 already do it. But I try run autoscaler with flink-1.15 job, it still doesn't work. Because the IOMetricsInfo added some properties, they are primitive type. It should disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES as well. (Not sure whether it should be a seperate FLIP or it can be a part of FLIP-401 [2].) h2. How to fix it in the short term? 1. Copy the latest RestMapperUtils and RestClient from master branch (It includes FLINK-33268) to flink-autoscaler module. (The copied class will be loaded first) 2. Disable DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in RestMapperUtils#flexibleObjectMapper in copied class. Based on these 2 steps, flink-1.15 works well with autoscaler. (I try it locally). After DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES in RestMapperUtils#flexibleObjectMapper is disabled, and the corresponding code is released in flink side. flink-ubernetes-operator can remove these 2 copied classes. [1] https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34522) StateTtlConfig#cleanupInRocksdbCompactFilter still use the deprecated Time
Rui Fan created FLINK-34522: --- Summary: StateTtlConfig#cleanupInRocksdbCompactFilter still use the deprecated Time Key: FLINK-34522 URL: https://issues.apache.org/jira/browse/FLINK-34522 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.19.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0, 1.20.0 FLINK-32570 deprecated the Time class and refactor all Public or PublicEvolving apis to use the Java's Duration. StateTtlConfig.Builder#cleanupInRocksdbCompactFilter is still using the Time class. In general, we expect: * Mark it as @Deprecated * Provide a new cleanupInRocksdbCompactFilter(long, Duration) But I found this method is introduced in 1.19, so a better solution may be: only provide cleanupInRocksdbCompactFilter(long, Duration) and don't use Time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34521) Using the Duration instead of the deprecated Time classes
Rui Fan created FLINK-34521: --- Summary: Using the Duration instead of the deprecated Time classes Key: FLINK-34521 URL: https://issues.apache.org/jira/browse/FLINK-34521 Project: Flink Issue Type: Technical Debt Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.20.0 FLINK-32570 deprecated org.apache.flink.api.common.time.Time and org.apache.flink.streaming.api.windowing.time.Time. We should refactor all internal callers from Time to Duration. (Public callers should be removed in 2.0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34504) Avoid the parallelism adjutment when the upstream shuffle type doesn't has keyBy
Rui Fan created FLINK-34504: --- Summary: Avoid the parallelism adjutment when the upstream shuffle type doesn't has keyBy Key: FLINK-34504 URL: https://issues.apache.org/jira/browse/FLINK-34504 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.8.0 JobVertexScaler#scale has a optimization: Try to adjust the parallelism such that it divides the number of key groups without a remainder => data is evenly spread across subtasks. It's only useful when the upstream shuffle type has keyBy. We should avoid this optimization when the upstream shuffle type doesn't has keyBy. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34502) Support calculating network memory for forward and rescale edge
Rui Fan created FLINK-34502: --- Summary: Support calculating network memory for forward and rescale edge Key: FLINK-34502 URL: https://issues.apache.org/jira/browse/FLINK-34502 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan This is follow up Jira of FLINK-34471. FLINK-34471 assuming all connections type are ALL_TO_ALL. This Jira will optimize it to save some network memory for forward and rescale connection. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34471) Tune the network memroy in Autoscaler
Rui Fan created FLINK-34471: --- Summary: Tune the network memroy in Autoscaler Key: FLINK-34471 URL: https://issues.apache.org/jira/browse/FLINK-34471 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Design doc: https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34389) JdbcAutoscalerStateStore explicitly writes update_time
Rui Fan created FLINK-34389: --- Summary: JdbcAutoscalerStateStore explicitly writes update_time Key: FLINK-34389 URL: https://issues.apache.org/jira/browse/FLINK-34389 Project: Flink Issue Type: Improvement Components: Autoscaler Affects Versions: kubernetes-operator-1.8.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.8.0 JdbcAutoscalerStateStore explicitly writes update_time instead of relying on the database to update. Some databases doesn't support update the timestamp column automatically. As the common source service, in order to support all databases well, it's better to handle it inside of the service. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes
Rui Fan created FLINK-34336: --- Summary: AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes Key: FLINK-34336 URL: https://issues.apache.org/jira/browse/FLINK-34336 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.19.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang in waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color} h2. Reason: The job has 2 tasks(vertices), after calling updateJobResourceRequirements. The source parallelism isn't changed (It's parallelism) , and the FlatMapper+Sink is changed from parallelism to parallelism2. So we expect the task number should be parallelism + parallelism2 instead of parallelism2. h2. Why it can be passed for now? Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by default. It means, flink job will rescale job 30 seconds after updateJobResourceRequirements is called. So the running tasks are old parallelism when we call waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color} IIUC, it cannot be guaranteed, and it's unexpected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34218) AutoRescalingITCase#testCheckpointRescalingInKeyedState fails
Rui Fan created FLINK-34218: --- Summary: AutoRescalingITCase#testCheckpointRescalingInKeyedState fails Key: FLINK-34218 URL: https://issues.apache.org/jira/browse/FLINK-34218 Project: Flink Issue Type: Bug Components: Tests Reporter: Rui Fan https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56740=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=ae4f8708-9994-57d3-c2d7-b892156e7812 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34212) Autoscaler Standalone should clean up stopped jobs to prevent memory leaks
Rui Fan created FLINK-34212: --- Summary: Autoscaler Standalone should clean up stopped jobs to prevent memory leaks Key: FLINK-34212 URL: https://issues.apache.org/jira/browse/FLINK-34212 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Autoscaler Standalone will fetch job list, and scale them. In general, autoscaler has some cache in memory. Autoscaler Standalone should clean up them if some jobs cannot be fetched anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34178) The ScalingTracking of autoscaler is wrong
Rui Fan created FLINK-34178: --- Summary: The ScalingTracking of autoscaler is wrong Key: FLINK-34178 URL: https://issues.apache.org/jira/browse/FLINK-34178 Project: Flink Issue Type: Bug Components: Autoscaler Affects Versions: kubernetes-operator-1.8.0 Reporter: Rui Fan Assignee: Rui Fan The ScalingTracking of autoscaler is wrong, it's always greater than AutoScalerOptions#STABILIZATION_INTERVAL. h2. Reason: When flink job isStabilizing, ScalingMetricCollector#updateMetrics will return a empty metric history. In the JobAutoScalerImpl#runScalingLogic method, if `collectedMetrics.getMetricHistory().isEmpty()` , we don't update the ScalingTracking. The default value of AutoScalerOptions#STABILIZATION_INTERVAL is 5 min, so the restartTime is always greater than 5 min. However, it's quick when we use rescale api. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34176) Remove unnecessary implemetations of RestartBackoffTimeStrategy
Rui Fan created FLINK-34176: --- Summary: Remove unnecessary implemetations of RestartBackoffTimeStrategy Key: FLINK-34176 URL: https://issues.apache.org/jira/browse/FLINK-34176 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan h2. Could we deprecate the failure-rate and fixed-delay restart-strategies directly? After FLINK-33735 (FLIP-364), the exponential-delay restart strategy is already very feature-rich. It can replace fix-delay and failure rate restart strategies directly. h2. How to replace FixedDelayRestartBackoffTimeStrategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS = Interget.Max * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FixedDelayRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxNumberRestartAttempts of FixedDelayRestartBackoffTimeStrategy h2. How to replace FailureRateRestartBackoffTimeStrategy? * Set backoffMultiplier = 1 and jitterFactor = 0 * resetBackoffThresholdMS is the failuresIntervalMS of FailureRateRestartBackoffTimeStrategy * initialBackoffMS and maxBackoffMS are the backoffTimeMS of FailureRateRestartBackoffTimeStrategy * attemptsBeforeResetBackoff is the maxFailuresPerInterval of FailureRateRestartBackoffTimeStrategy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34168) Refactor all callers that using the public Xxx getXxx(String key) and public void setXxx(String key, Xxx value)
Rui Fan created FLINK-34168: --- Summary: Refactor all callers that using the public Xxx getXxx(String key) and public void setXxx(String key, Xxx value) Key: FLINK-34168 URL: https://issues.apache.org/jira/browse/FLINK-34168 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Xuannan Su Refactor all callers that using the public Xxx getXxx(String key) and public void setXxx(String key, Xxx value) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34130) Mark setBytes and getBytes of Configuration as @Internal
Rui Fan created FLINK-34130: --- Summary: Mark setBytes and getBytes of Configuration as @Internal Key: FLINK-34130 URL: https://issues.apache.org/jira/browse/FLINK-34130 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34104) Improve the ScalingReport format of autoscaling
Rui Fan created FLINK-34104: --- Summary: Improve the ScalingReport format of autoscaling Key: FLINK-34104 URL: https://issues.apache.org/jira/browse/FLINK-34104 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.8.0 Currently, the scaling report format is {color:#6a8759} Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f{color} {color:#172b4d}It has 2 disadvantages:{color} # {color:#172b4d}When one job has multiple vertices, the report of all vertices are mixed together without any separator{color}{color:#172b4d}, here is an example:{color} ** {color:#172b4d}Scaling execution enabled, begin scaling vertices: Vertex ID ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 2 -> 1 | Processing capacity 800466.67 -> 320186.67 | Target data rate 715.10 Vertex ID bc764cd8ddf7a0cff126f51c16239658 | Parallelism 2 -> 1 | Processing capacity 79252.08 -> 31700.83 | Target data rate 895.93 Vertex ID 0a448493b4782967b150582570326227 | Parallelism 8 -> 16 | Processing capacity 716.05 -> 1141.00 | Target data rate 715.54{color} ** {color:#172b4d}We can see the Vertex ID is the beginning of each vertex report, it doesn't have any {color}{color:#172b4d}separator with the last vertex. {color} # {color:#172b4d}This format is non-standard{color}{color:#172b4d}, it's hard to deserialize.{color} ** {color:#172b4d}When job enables the autoscaler and disable the scaling.{color} ** {color:#172b4d}Flink platform maintainer wants to show the scaling report in WebUI, it's helpful to using the report result for flink users.{color} ** {color:#172b4d}So easy to deserialize is useful for these flink platform.{color} h2. {color:#172b4d}Solution:{color} {color:#172b4d}Serializing the scaling report with json format, it's easy to read and deserialize.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34082) Remove deprecated methods of Configuration in 2.0
Rui Fan created FLINK-34082: --- Summary: Remove deprecated methods of Configuration in 2.0 Key: FLINK-34082 URL: https://issues.apache.org/jira/browse/FLINK-34082 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34081) Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value)
Rui Fan created FLINK-34081: --- Summary: Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value) Key: FLINK-34081 URL: https://issues.apache.org/jira/browse/FLINK-34081 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34080) Simplify the Configuration
Rui Fan created FLINK-34080: --- Summary: Simplify the Configuration Key: FLINK-34080 URL: https://issues.apache.org/jira/browse/FLINK-34080 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 This Jira is 2.2 part of FLIP-405: * 2.2.1 Update Configuration to encourage the usage of ConfigOption over string configuration key * 2.2.2 Introduce public T get(ConfigOption configOption, T overrideDefault) * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34079) FLIP-405: Migrate string configuration key to ConfigOption
Rui Fan created FLINK-34079: --- Summary: FLIP-405: Migrate string configuration key to ConfigOption Key: FLINK-34079 URL: https://issues.apache.org/jira/browse/FLINK-34079 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Rui Fan Assignee: Xuannan Su Fix For: 1.19.0 This is an umbrella Jira of [FLIP-405|https://cwiki.apache.org/confluence/x/6Yr5E] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34065) Design AbstractAutoscalerStateStore to support serialize State to String
Rui Fan created FLINK-34065: --- Summary: Design AbstractAutoscalerStateStore to support serialize State to String Key: FLINK-34065 URL: https://issues.apache.org/jira/browse/FLINK-34065 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Some logic of {{KubernetesAutoScalerStateStore}} and {{JDBCAutoScalerStateStore}} are similar, we can share some common code. * {{ConfigMapStore}} and {{JDBCStore}} can be abstracted to {{StringStateStore}} interface ** They support {{{}put{}}}, {{get}} and {{remove}} ** The parameters of {{ConfigMapStore}} are the (JobContext, String key, String value). ** The parameters of {{JDBCStore}} are the (String jobKey, StateType stateType, String value). ** We can define a interface {{{}StringStateStore{}}}, and the parameters are {{{}(JobContext, StateType stateType, String value){}}}. * {{KubernetesAutoScalerStateStore}} and {{JDBCAutoScalerStateStore}} can be abstracted to {{AbstractAutoscalerStateStore}} ** They support serialize and compress {{Original State}} to String. ** {{AbstractAutoscalerStateStore}} can reuse the serialize and compress logic ** {{KubernetesAutoScalerStateStore}} support the limitation of stateValue ** We can define a parameter for {{{}AbstractAutoscalerStateStore{}}}, the limitation is disabled by default, and {{KubernetesAutoScalerStateStore}} can enable it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34021) Print jobKey in the Autoscaler standalone log
Rui Fan created FLINK-34021: --- Summary: Print jobKey in the Autoscaler standalone log Key: FLINK-34021 URL: https://issues.apache.org/jira/browse/FLINK-34021 Project: Flink Issue Type: Sub-task Components: Autoscaler Affects Versions: kubernetes-operator-1.8.0 Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33999) FLIP-412: Add the time-consuming span of each stage when starting the Flink job to TraceReporter
Rui Fan created FLINK-33999: --- Summary: FLIP-412: Add the time-consuming span of each stage when starting the Flink job to TraceReporter Key: FLINK-33999 URL: https://issues.apache.org/jira/browse/FLINK-33999 Project: Flink Issue Type: New Feature Components: Runtime / Coordination, Runtime / Metrics Reporter: Rui Fan Assignee: junzhong qin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33965) Refactor the configuration for autoscaler standalone
Rui Fan created FLINK-33965: --- Summary: Refactor the configuration for autoscaler standalone Key: FLINK-33965 URL: https://issues.apache.org/jira/browse/FLINK-33965 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Rui Fan Currently, all configurations of autoscaler standalone are maintained in string key. When autoscaler standalone has a little options, it's easy to maintain. However, I found it's hard to maintain when we add more options. During I developing the JDBC autoscaler state store and control loop supports multiple thread. It will introduce more options. h2. Solution: Introducing the AutoscalerStandaloneOptions to manage all options of autoscaler standalone. And output the doc for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33960) Adaptive Scheduler doesn't respect the lowerBound for tasks
Rui Fan created FLINK-33960: --- Summary: Adaptive Scheduler doesn't respect the lowerBound for tasks Key: FLINK-33960 URL: https://issues.apache.org/jira/browse/FLINK-33960 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.17.2, 1.18.1 Reporter: Rui Fan Assignee: Rui Fan Adaptive Scheduler doesn't respect the lowerBound for tasks when one flink job has more than 1 tasks. When we using the adaptive scheduler and the rescale api, users will set the lowerBound and upperBound for each job vertices. And users expect the parallelism of all vertices between lowerBound and upperBound. But when one flink job has more than 1 vertex, and resource isn't enough. Some of lowerBound won't be respect. h2. How to reproduce this bug: One job has 3 job vertices, we set the resource requirements are: * Vertex1: lowerBound=2, upperBound=2 * Vertex2: lowerBound=8, upperBound=8 * Vertex3: lowerBound=2, upperBound=2 They are same slotSharingGroup, and we only 5 available slots. This job shouldn't run due to the slots cannot meets the resource requiremnt for vertex2. But the job can runs, and the parallelism of vertex2 is 5. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33942) DelegatingConfiguration misses the perfix for some methods
Rui Fan created FLINK-33942: --- Summary: DelegatingConfiguration misses the perfix for some methods Key: FLINK-33942 URL: https://issues.apache.org/jira/browse/FLINK-33942 Project: Flink Issue Type: Bug Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan * DelegatingConfiguration#getInteger(org.apache.flink.configuration.ConfigOption, int) * DelegatingConfiguration#getLong(org.apache.flink.configuration.ConfigOption, long) * org.apache.flink.configuration.DelegatingConfiguration#getBoolean(org.apache.flink.configuration.ConfigOption, boolean) * org.apache.flink.configuration.DelegatingConfiguration#getFloat(org.apache.flink.configuration.ConfigOption, float) * org.apache.flink.configuration.DelegatingConfiguration#getDouble(org.apache.flink.configuration.ConfigOption, double) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33935) Improve the default value doc and logic for some state backend and checkpoint related options
Rui Fan created FLINK-33935: --- Summary: Improve the default value doc and logic for some state backend and checkpoint related options Key: FLINK-33935 URL: https://issues.apache.org/jira/browse/FLINK-33935 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 Some state backend and checkpoint related options don't set the default value directly, but but they implement default value based on code. Such as: * execution.checkpointing.tolerable-failed-checkpoints ** [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints] * state.backend.type ** [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-backend-type] h2. Option1 execution.checkpointing.tolerable-failed-checkpoints doesn't have default value, but CheckpointConfig#getTolerableCheckpointFailureNumber calls {color:#9876aa}configuration{color}.getOptional(ExecutionCheckpointingOptions.{color:#9876aa}TOLERABLE_FAILURE_NUMBER{color}).orElse({color:#6897bb}0{color}). It means the 0 is default value of execution.checkpointing.tolerable-failed-checkpoints. h2. Option2 state.backend.type does't have default value, but StateBackendLoader#loadFromApplicationOrConfigOrDefaultInternal calls loadStateBackendFromConfig(config{color:#cc7832}, {color}classLoader{color:#cc7832}, {color}logger). When the return value is null, Flink will consider the hashmap as the default state backend. I checked all callers of StateBackendLoader#loadStateBackendFromConfig, if we change the default value of state.backend.type to hashmap. All of them work well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration
Rui Fan created FLINK-33865: --- Summary: exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration Key: FLINK-33865 URL: https://issues.apache.org/jira/browse/FLINK-33865 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-12-17-17-56-59-138.png exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration. Reason: when exponential-delay.attempts-before-reset-backoff is set by job Configuration instead of cluster configuration. ExecutionConfig#configure will call RestartStrategies#parseConfiguration to create the RestartStrategyConfiguration. And then RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will create the ExponentialDelayRestartBackoffTimeStrategyFactory by the RestartStrategyConfiguration. Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, so it doesn't support exponential-delay.attempts-before-reset-backoff. I have a misunderstand during FLINK-32895, I thought the RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory will create ExponentialDelayRestartBackoffTimeStrategyFactory by the clusterConfiguration. !image-2023-12-17-17-56-59-138.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33814) Autoscaler Standalone control loop supports multiple thread
Rui Fan created FLINK-33814: --- Summary: Autoscaler Standalone control loop supports multiple thread Key: FLINK-33814 URL: https://issues.apache.org/jira/browse/FLINK-33814 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.8.0 When the job list has a lot of jobs, single thread isn't enough. So Autoscaler Standalone control loop supports multiple thread is very useful for massive production, it's similar to kubernetes.operator.reconcile.parallelism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33813) Improve time formatting for easier reading inside the autoscaler
Rui Fan created FLINK-33813: --- Summary: Improve time formatting for easier reading inside the autoscaler Key: FLINK-33813 URL: https://issues.apache.org/jira/browse/FLINK-33813 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.8.0 The default format of Instant is UTC time, it's better to format it to the corresponding Time Zone.It will be easier reading. !https://f.haiserve.com/download/8f2044829bf190f81cd03ddc5a749dc40b0102082d9e08215890070201702451618703ac?userid=146850=31fa35028392b8b45a2ff21fe8481f1d! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33801) Revert to last scaling when job becomes unhealthy after scaling
Rui Fan created FLINK-33801: --- Summary: Revert to last scaling when job becomes unhealthy after scaling Key: FLINK-33801 URL: https://issues.apache.org/jira/browse/FLINK-33801 Project: Flink Issue Type: New Feature Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Revert to last scaling when job becomes unhealthy after scaling, such as: the memory is under pressure. In the first version, the unhealthy only for memory is under pressure. We can improve more unhealthy cases in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33752) When Duration is greater than or equal to 1 day, the display unit is ms.
Rui Fan created FLINK-33752: --- Summary: When Duration is greater than or equal to 1 day, the display unit is ms. Key: FLINK-33752 URL: https://issues.apache.org/jira/browse/FLINK-33752 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.18.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0, 1.18.1 Attachments: image-2023-12-05-19-44-17-161.png When the default value of Duration is 24 hours or 1 day, the display unit is ms. (8640 ms). For example, the kubernetes operator doc has 3 options, their default value are 8640 ms. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/ !image-2023-12-05-19-44-17-161.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33739) Document FLIP-364: Improve the exponential-delay restart-strategy
Rui Fan created FLINK-33739: --- Summary: Document FLIP-364: Improve the exponential-delay restart-strategy Key: FLINK-33739 URL: https://issues.apache.org/jira/browse/FLINK-33739 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33738) Make exponential-delay restart-strategy the default restart strategy
Rui Fan created FLINK-33738: --- Summary: Make exponential-delay restart-strategy the default restart strategy Key: FLINK-33738 URL: https://issues.apache.org/jira/browse/FLINK-33738 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33737) Merge multiple Exceptions into one attempt for exponential-delay restart-strategy
Rui Fan created FLINK-33737: --- Summary: Merge multiple Exceptions into one attempt for exponential-delay restart-strategy Key: FLINK-33737 URL: https://issues.apache.org/jira/browse/FLINK-33737 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33736) Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier
Rui Fan created FLINK-33736: --- Summary: Update default value of exponential-delay.max-backoff and exponential-delay.backoff-multiplier Key: FLINK-33736 URL: https://issues.apache.org/jira/browse/FLINK-33736 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 Update default value of exponential-delay.max-backoff from 5min to 1min. Update default value of exponential-delay.backoff-multiplier from 2.0 to 1.2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33735) FLIP-364: Improve the exponential-delay restart-strategy
Rui Fan created FLINK-33735: --- Summary: FLIP-364: Improve the exponential-delay restart-strategy Key: FLINK-33735 URL: https://issues.apache.org/jira/browse/FLINK-33735 Project: Flink Issue Type: New Feature Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 This is an umbrella Jira of [FLIP-364: Improve the exponential-delay restart-strategy.|https://cwiki.apache.org/confluence/x/uJqzDw] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33731) failover.flip1 package can be rename to failover
Rui Fan created FLINK-33731: --- Summary: failover.flip1 package can be rename to failover Key: FLINK-33731 URL: https://issues.apache.org/jira/browse/FLINK-33731 Project: Flink Issue Type: Technical Debt Components: Runtime / Coordination Affects Versions: 1.17.2, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 Currently, there is a org.apache.flink.runtime.executiongraph.failover.flip1 package. I propose rename the failover.flip1 to failover, in the other word: removing the flip1. I have 2 reasons: * The naming of the package should be based on business semantics, not FLIP number, and the code under the failover.flip1 package has also many changes after FLIP-1. * All code under the failover.flip1 package are Internal code instead @Public code, so they can be renamed directly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33701) restart.time-tracking doc is wrong
Rui Fan created FLINK-33701: --- Summary: restart.time-tracking doc is wrong Key: FLINK-33701 URL: https://issues.apache.org/jira/browse/FLINK-33701 Project: Flink Issue Type: Technical Debt Components: Autoscaler Affects Versions: kubernetes-operator-1.8.0 Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.8.0 Attachments: image-2023-11-30-16-27-06-149.png The {color:#6a8759}restart.time-tracking.limit {color}as the upper bound instead of {color:#6a8759}restart.time.{color} !image-2023-11-30-16-27-06-149.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33693) The disable unaligned checkpoint isn't respected when the aligned checkpoint timeout is enabled
Rui Fan created FLINK-33693: --- Summary: The disable unaligned checkpoint isn't respected when the aligned checkpoint timeout is enabled Key: FLINK-33693 URL: https://issues.apache.org/jira/browse/FLINK-33693 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.17.2, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-11-29-20-41-17-038.png The disable unaligned checkpoint isn't respected when the aligned checkpoint timeout is enabled. Bug at org.apache.flink.streaming.runtime.io.RecordWriterOutput#broadcastEvent, we will call withUnalignedUnsupported to transmit the unaligned checkpoint barrier to {color:#9876aa}FORCED_ALIGNED {color}barrier when the shuffle type cannot use unaligned checkpoint. However, the if has one condition: isPriorityEvent. When aligned checkpoint timeout is enabled, flink will emit one timeoutable barrier. It isn't PriorityEvent. But timeoutable barrier should call withUnalignedUnsupported here. !image-2023-11-29-20-41-17-038.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33650) Checkpoint Storage doc is missed
Rui Fan created FLINK-33650: --- Summary: Checkpoint Storage doc is missed Key: FLINK-33650 URL: https://issues.apache.org/jira/browse/FLINK-33650 Project: Flink Issue Type: Technical Debt Components: Documentation Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-11-25-20-03-22-169.png, image-2023-11-25-20-04-34-670.png, image-2023-11-25-20-04-50-937.png Checkpoint Storage and available-checkpoint-storage-options part is missed in the checkpoints page [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#checkpoint-storage] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/state/checkpoints/ !image-2023-11-25-20-04-34-670.png|width=365,height=352! !image-2023-11-25-20-04-50-937.png|width=379,height=265! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33649) Exponential Delay doc is missed
Rui Fan created FLINK-33649: --- Summary: Exponential Delay doc is missed Key: FLINK-33649 URL: https://issues.apache.org/jira/browse/FLINK-33649 Project: Flink Issue Type: Technical Debt Components: Documentation Reporter: Rui Fan Assignee: Rui Fan Exponential Delay doc is missed in 2 pages: * Configuration page missed Exponential Delay related options ** [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#fault-tolerance] ** EN and zh doc * task-failure-recovery missed Exponential Delay ** [https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#task-failure-recovery] ** Only zh version miss -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33636) Implement JdbcAutoScalerEventHandler
Rui Fan created FLINK-33636: --- Summary: Implement JdbcAutoScalerEventHandler Key: FLINK-33636 URL: https://issues.apache.org/jira/browse/FLINK-33636 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33625) FLIP-390: Support System out and err to be redirected to LOG or discarded
Rui Fan created FLINK-33625: --- Summary: FLIP-390: Support System out and err to be redirected to LOG or discarded Key: FLINK-33625 URL: https://issues.apache.org/jira/browse/FLINK-33625 Project: Flink Issue Type: New Feature Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 Get more from https://cwiki.apache.org/confluence/x/4guZE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33565) The concurrentExceptions doesn't work
Rui Fan created FLINK-33565: --- Summary: The concurrentExceptions doesn't work Key: FLINK-33565 URL: https://issues.apache.org/jira/browse/FLINK-33565 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.17.1, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan First of all, thanks to [~mapohl] for helping double-check in advance that this was indeed a bug . Displaying exception history in WebUI is supported in FLINK-6042. h1. What's the concurrentExceptions? When an execution fails due to an exception, other executions in the same region will also restart, and the first Exception is rootException. If other restarted executions also report Exception at this time, we hope to collect these exceptions and Displayed to the user as concurrentExceptions. h2. What's this bug? The concurrentExceptions is always empty in production, even if other executions report exception at very close times. h1. Why doesn't it work? If one job has all-to-all shuffle, this job only has one region, and this region has a lot of executions. If one execution throw exception: * JobMaster will mark the state as FAILED for this execution. * The rest of executions of this region will be marked to CANCELING. ** This call stack can be found at FLIP-364 [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover] When these executions throw exception as well, it JobMaster will mark the state from CANCELING to CANCELED instead of FAILED. The CANCELED execution won't call FAILED logic, so their exceptions are ignored. Note: all reports are executed inside of JobMaster RPC thread, it's single thread. So these reports are executed serially. So only one execution is marked to FAILED, and the rest of executions will be marked to CANCELED later. h1. How to fix it? Offline discuss with [~mapohl] , we need to discuss with community should we keep the concurrentExceptions first. * If no, we can remove related logic directly * If yew, we discuss how to fix it later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33527) Clear all pysical states after autoscaler is disabled
Rui Fan created FLINK-33527: --- Summary: Clear all pysical states after autoscaler is disabled Key: FLINK-33527 URL: https://issues.apache.org/jira/browse/FLINK-33527 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan Fix For: kubernetes-operator-1.7.0 Currently, we just clear ParallelismOverrides after autoscaler is disabled. We should clear CollectedMetrics and ScalingHistory as well: * CollectedMetrics can be cleared directly. * ScalingHistory can be cleared based on trim logic( {color:#9876aa}VERTEX_SCALING_HISTORY_AGE){color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33452) A series of improvements of Autoscaler Standalone
Rui Fan created FLINK-33452: --- Summary: A series of improvements of Autoscaler Standalone Key: FLINK-33452 URL: https://issues.apache.org/jira/browse/FLINK-33452 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan This is an umbrella ticket. It aims to make a series of improvements to the Autoscaler Standalone to bring it into production. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33450) Implement JdbcAutoScalerStateStore
Rui Fan created FLINK-33450: --- Summary: Implement JdbcAutoScalerStateStore Key: FLINK-33450 URL: https://issues.apache.org/jira/browse/FLINK-33450 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33403) Bump flink version to 1.18.0 for flink-kubernetes-operator
Rui Fan created FLINK-33403: --- Summary: Bump flink version to 1.18.0 for flink-kubernetes-operator Key: FLINK-33403 URL: https://issues.apache.org/jira/browse/FLINK-33403 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33359) Kubernetes operator supports Java 17
Rui Fan created FLINK-33359: --- Summary: Kubernetes operator supports Java 17 Key: FLINK-33359 URL: https://issues.apache.org/jira/browse/FLINK-33359 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Rui Fan In the voting mailing list for flink-kubernetes-operator version 1.6.1, Thomas mentioned Kubernetes operator doesn't support java 17. Offline discuss with [~gyfora] , we hope Kubernetes operator supports Java 17 as a critical ticket in 1.7.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33354) Reuse the TaskInformation for multiple slots
Rui Fan created FLINK-33354: --- Summary: Reuse the TaskInformation for multiple slots Key: FLINK-33354 URL: https://issues.apache.org/jira/browse/FLINK-33354 Project: Flink Issue Type: Sub-task Components: Runtime / Task Affects Versions: 1.17.1, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan The background is similar to FLINK-33315. A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. When slotPerTM = 4, one TM will run 4 HiveSources at the same time. How the TaskExecutor to submit a large task? # TaskExecutor#loadBigData will read all bytes from file to SerializedValue ** The SerializedValue has a byte[] ** It will cost the heap memory ** It will be great than 281 MB, because it not only stores HiveSource#partitionBytes, it also stores other information of TaskInformation. # Generate the TaskInformation from SerializedValue ** TaskExecutor#submitTask calls the tdd.getSerializedTaskInformation()..deserializeValue() ** tdd.getSerializedTaskInformation() is SerializedValue ** It will generate the TaskInformation ** TaskInformation includes the Configuration {color:#9876aa}taskConfiguration{color} ** The {color:#9876aa}taskConfiguration{color} includes StreamConfig#{color:#9876aa}SERIALIZEDUDF{color} {color:#172b4d}Based on the above process, TM memory will have 2 big byte array for each task:{color} * {color:#172b4d}The SerializedValue{color} * {color:#172b4d}The TaskInformation{color} When one TM runs 4 HiveSources at the same time, it will have 8 big byte array. In our production environment, this is also a situation that often leads to TM OOM. h2. Solution: These data is totally same due to the PermanentBlobKey is same. We can add a cache for it to reduce the memory and cpu cost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33317) Add cleaning mechanism for StreamConfig#SERIALIZEDUDF
Rui Fan created FLINK-33317: --- Summary: Add cleaning mechanism for StreamConfig#SERIALIZEDUDF Key: FLINK-33317 URL: https://issues.apache.org/jira/browse/FLINK-33317 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Affects Versions: 1.17.0, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan See FLINK-33315 for details. This Jira focus on avoid unnecessary memory usage, it can optimize the memory cost of Replica_3 in FLINK-33315. Solution: * Define a threshold for cleaning mechanism of StreamConfig#SERIALIZEDUDF * After getStreamOperatorFactory, the StreamConfig#SERIALIZEDUDF can be cleaned if the value size is great than threshold * If it still needed later, we can flush it to disk, and load it from disk if needed(And clean it in memory immediately after using). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33316) Avoid unnecessary heavy getStreamOperatorFactory
Rui Fan created FLINK-33316: --- Summary: Avoid unnecessary heavy getStreamOperatorFactory Key: FLINK-33316 URL: https://issues.apache.org/jira/browse/FLINK-33316 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Affects Versions: 1.17.0, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan This Jira focus on avoid unnecessary heavy getStreamOperatorFactory, it can optimize the memory and cpu cost of Replica_3 in FLINK-33315. Solution: We can store the serializedUdfClassName at StreamConfig, and using the getStreamOperatorFactoryClassName instead of the heavy getStreamOperatorFactory in OperatorChain#getOperatorRecordsOutCounter. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33315) Optimize memory usage of large StreamOperator
Rui Fan created FLINK-33315: --- Summary: Optimize memory usage of large StreamOperator Key: FLINK-33315 URL: https://issues.apache.org/jira/browse/FLINK-33315 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Affects Versions: 1.17.0, 1.18.0 Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-10-19-16-28-16-077.png Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM always fail with java.lang.OutOfMemoryError: Java heap space. Here is a example: a hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. After analysis, the root cause is that TM maintains the big object with 3 replicas: * Replica_1: SourceOperatorFactory (it's necessary for running task) * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object. ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646]) ** When creating a successor operator to a SourceOperator, the call stack is: *** OperatorChain#createOperatorChain -> *** wrapOperatorIntoOutput -> *** getOperatorRecordsOutCounter -> *** operatorConfig.getStreamOperatorFactory(userCodeClassloader) ** It will generate the SourceOperatorFactory temporarily and just check whether it's SinkWriterOperatorFactory * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color} ** It is used to generate SourceOperatorFactory. ** Now the value is always maintained in heap memory. ** However, after generating we can release it or store it in the disk if needed. *** We can define a threshold, when the value size is less than threshold, the release strategy doesn't take effect. ** If so, we can save a lot of heap memory. These three replicas use about 800MB of memory. Please note that this is just a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB. These large objects in the JVM cannot be recycled, causing TM to frequently OOM. This JIRA is focused on optimizing Replica_2 and Replica_3. !image-2023-10-19-16-28-16-077.png! !https://f.haiserve.com/download/5366d5f07c07a00116b148c6fa1ebff00b01021cc3da0438a0860702016976849360726a?userid=146850=d4a7e7d617dc71ea28bf02977333e1a8|width=1935,height=1127! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33287) Using the flink shaded jackson for flink-autoscaler
Rui Fan created FLINK-33287: --- Summary: Using the flink shaded jackson for flink-autoscaler Key: FLINK-33287 URL: https://issues.apache.org/jira/browse/FLINK-33287 Project: Flink Issue Type: Sub-task Components: Autoscaler Reporter: Rui Fan Assignee: Rui Fan FLINK-33098 still using the jackson version of {{flink-kubernetes-operator}} instead of flink shaded version. I want to update it after {{flink-1.18}} released. The reason is : current autoscaler is using the {{loaderOptions}} to limit the serialized size. * The shaded jackson version of {{flink-1.17}} is {{{}2.13.4-16.1{}}}, it doesn't support the {{{}loaderOptions{}}}. * The shaded jackson version of {{flink-1.18}} is {{{}2.14.2-17.0{}}}, it supports the {{{}loaderOptions{}}}. For details can be get from this comment[1]. [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1336571925 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33237) Optimize the data type of JobStatus#state from String to Enum
Rui Fan created FLINK-33237: --- Summary: Optimize the data type of JobStatus#state from String to Enum Key: FLINK-33237 URL: https://issues.apache.org/jira/browse/FLINK-33237 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Rui Fan Assignee: Rui Fan As discuss at comment[1], the type of {{org.apache.flink.kubernetes.operator.api.status.JobStatus#state}} be changed from {{String}} to {{org.apache.flink.api.common.JobStatus.}} [1] https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354340358 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33102) Document the standalone autoscaler
Rui Fan created FLINK-33102: --- Summary: Document the standalone autoscaler Key: FLINK-33102 URL: https://issues.apache.org/jira/browse/FLINK-33102 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33101) Add the integration test for standalone autoscaler
Rui Fan created FLINK-33101: --- Summary: Add the integration test for standalone autoscaler Key: FLINK-33101 URL: https://issues.apache.org/jira/browse/FLINK-33101 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Samrat Deb -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33100) Implement YarnJobFetcher for Standalone Autoscaler
Rui Fan created FLINK-33100: --- Summary: Implement YarnJobFetcher for Standalone Autoscaler Key: FLINK-33100 URL: https://issues.apache.org/jira/browse/FLINK-33100 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33099) Support the Standalone Autoscaler
Rui Fan created FLINK-33099: --- Summary: Support the Standalone Autoscaler Key: FLINK-33099 URL: https://issues.apache.org/jira/browse/FLINK-33099 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33098) Support kubernetes autoscaler using generic interface
Rui Fan created FLINK-33098: --- Summary: Support kubernetes autoscaler using generic interface Key: FLINK-33098 URL: https://issues.apache.org/jira/browse/FLINK-33098 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Rui Fan # Moving all classes aren't related to kubernetes to flink-autoscaler module # Support kubernetes autoscaler using generic interface # Removing the flink-kubernetes-operator-autoscaler module # Removing the option prefix(kubernetes.operator.) for all options and update the doc(All old option names are marked with withDeprecatedKeys to ensure the compatibility.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33097) Initialize the generic autoscaler module and interfaces
Rui Fan created FLINK-33097: --- Summary: Initialize the generic autoscaler module and interfaces Key: FLINK-33097 URL: https://issues.apache.org/jira/browse/FLINK-33097 Project: Flink Issue Type: Sub-task Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33092) Improve the resource-stabilization-timeout mechanism when rescale a job for Adaptive Scheduler
Rui Fan created FLINK-33092: --- Summary: Improve the resource-stabilization-timeout mechanism when rescale a job for Adaptive Scheduler Key: FLINK-33092 URL: https://issues.apache.org/jira/browse/FLINK-33092 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-09-15-14-43-35-104.png !image-2023-09-15-14-43-35-104.png|width=776,height=548! h1. 1. Propose The above is the state transition graph when rescale a job in Adaptive Scheduler. In brief, when we trigger a rescale, the job will wait _*resource-stabilization-timeout*_ in WaitingForResources State when it has sufficient resources and it doesn't have the desired resource. If the _*resource-stabilization-timeout mechanism*_ is moved into the Executing State, the rescale downtime will be significantly reduced. h1. 2. Why the downtime is long?can be significantly reduced Currently, when rescale a job: * The Executing will transition to Restarting * The Restarting will cancel this job first. * The Restarting will transition to WaitingForResources after the whole job is terminal. * When this job has sufficient resources and it doesn't have the desired resource, the WaitingForResources needs to wait _*resource-stabilization-timeout*_ . * WaitingForResources will transition to CreatingExecutionGraph after resource-stabilization-timeout. The problem is the job isn't running during the resource-stabilization-timeout phase. h1. 3. How to reduce the downtime? We can move the _*resource-stabilization-timeout mechanism*_ into the Executing State when trigger a rescale. It means: * When this job has desired resources, the Executing can rescale directly. * When this job has sufficient resources and it doesn't have the desired resource, we can rescale after _*resource-stabilization-timeout.*_ * The WaitingForResources will ignore the resource-stabilization-timeout after this improvement. The resource-stabilization-timeout works before cancel job, so the rescale downtime will be significantly reduced. Note: the resource-stabilization-timeout still works in WaitingForResources when start a job. It's just changed when rescale a job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33065) Optimize the exception message when the program plan could not be fetched
Rui Fan created FLINK-33065: --- Summary: Optimize the exception message when the program plan could not be fetched Key: FLINK-33065 URL: https://issues.apache.org/jira/browse/FLINK-33065 Project: Flink Issue Type: Improvement Reporter: Rui Fan Assignee: Rui Fan When the program plan could not be fetched, the root cause may be: the main method doesn't call the `env.execute()`. We can optimize the message to help user find this root cause. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33042) Allow trigger flamegraph when task is initializing
Rui Fan created FLINK-33042: --- Summary: Allow trigger flamegraph when task is initializing Key: FLINK-33042 URL: https://issues.apache.org/jira/browse/FLINK-33042 Project: Flink Issue Type: Improvement Components: Runtime / REST, Runtime / Web Frontend Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-09-06-15-43-37-075.png Currently, the flamegraph can be triggered when task is running. After FLINK-17012 and FLINK-22215, flink split the running to running and initializing. We should allow trigger flamegraph when task is initializing. For example, the initialization is very slow, we need to troubleshoot. Here is a stack example, task is rebuilding the rocksdb after the parallelism is changed. !image-2023-09-06-15-43-37-075.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32965) Remove the deprecated MutableObjectMode
Rui Fan created FLINK-32965: --- Summary: Remove the deprecated MutableObjectMode Key: FLINK-32965 URL: https://issues.apache.org/jira/browse/FLINK-32965 Project: Flink Issue Type: Technical Debt Reporter: Rui Fan Assignee: Rui Fan FLINK-1285 updated the MutableObjectMode to ObjectReuse, and TaskConfig#getMutableObjectMode() is not used for a long long time. So it can be removed. Also, we should update the GroupReduceDriverTest: * Updating the _testAllReduceDriverAccumulatingImmutable()_ from `context.setMutableObjectMode({color:#cc7832}false{color}){color:#cc7832};{color}` to `context.getExecutionConfig().disableObjectReuse(){color:#cc7832};{color}`. * Updating the _testAllReduceDriverIncorrectlyAccumulatingMutable()_ to `context.getExecutionConfig().disableObjectReuse(){color:#cc7832};{color}`, and fix the assert. More details can be get from https://github.com/apache/flink/pull/23233#discussion_r1304595960 -- This message was sent by Atlassian Jira (v8.20.10#820010)