[jira] [Created] (FLINK-35555) Serializing List with null values throws NPE
Zhanghao Chen created FLINK-3: - Summary: Serializing List with null values throws NPE Key: FLINK-3 URL: https://issues.apache.org/jira/browse/FLINK-3 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.20.0 Reporter: Zhanghao Chen Fix For: 1.20.0 FLINK-34123 introduced built-in serialization support for java.util.List, which relies on the existing {{ListSerializer}} impl. However, {{ListSerializer}} does not allow null values, as it is originally designed for serializing {{ListState}} only where null value is explicitly forbidden in the contract. Directly adding null marker to allow null values will break backwards state compatibility, so we'll need to introduce a new List serializer and corrsponding TypeInformation that allows null values for serializing user objects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35458) Add serializer upgrade test for set serializer
Zhanghao Chen created FLINK-35458: - Summary: Add serializer upgrade test for set serializer Key: FLINK-35458 URL: https://issues.apache.org/jira/browse/FLINK-35458 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System Affects Versions: 1.20.0 Reporter: Zhanghao Chen Fix For: 2.0.0 New dedicated serializer for Sets is introduced in [FLINK-35068|https://issues.apache.org/jira/browse/FLINK-35068]. Since serializer upgrade test requires at least one previous release to test the upgrade of set serializers (which does not exist yet), we'll add the upgrade test for set serializer after the release of v1.20. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35145) Add timeout for cluster termination
Zhanghao Chen created FLINK-35145: - Summary: Add timeout for cluster termination Key: FLINK-35145 URL: https://issues.apache.org/jira/browse/FLINK-35145 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.20.0 Reporter: Zhanghao Chen Fix For: 1.20.0 Currently, cluster termination may be blocked forever as there's no timeout for that. For example, for an Application cluster with ZK HA enabled, when ZK cluster is down, the cluster will reach termination status, but the termination process will be blocked when trying to clean up HA data on ZK. Similar phenomenon can be observed when an HDFS/S3 outage occurs. I propose adding a timeout for the cluster termination process in ClusterEntryPoint# shutDownAsync method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35068) Introduce built-in serialization support for Set
Zhanghao Chen created FLINK-35068: - Summary: Introduce built-in serialization support for Set Key: FLINK-35068 URL: https://issues.apache.org/jira/browse/FLINK-35068 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System Affects Versions: 1.20.0 Reporter: Zhanghao Chen Fix For: 1.20.0 Introduce built-in serialization support for {{{}Set{}}}, another common Java collection type. We'll need to add a new built-in serializer for it ({{{}MultiSetTypeInformation{}}} utilizes {{MapSerializer}} underneath, but it could be more efficient for common {{{}Set{}}}). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34354) Release Testing: Verify FLINK-34037 Improve Serialization Configuration and Usage in Flink
Zhanghao Chen created FLINK-34354: - Summary: Release Testing: Verify FLINK-34037 Improve Serialization Configuration and Usage in Flink Key: FLINK-34354 URL: https://issues.apache.org/jira/browse/FLINK-34354 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System Affects Versions: 1.19.0 Reporter: Zhanghao Chen Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34328) Release Testing Instructions: Verify FLINK-34037 Improve Serialization Configuration And Usage In Flink
Zhanghao Chen created FLINK-34328: - Summary: Release Testing Instructions: Verify FLINK-34037 Improve Serialization Configuration And Usage In Flink Key: FLINK-34328 URL: https://issues.apache.org/jira/browse/FLINK-34328 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System, Runtime / Configuration Affects Versions: 1.19.0 Reporter: Zhanghao Chen Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34270) Update connector developer-facing doc
Zhanghao Chen created FLINK-34270: - Summary: Update connector developer-facing doc Key: FLINK-34270 URL: https://issues.apache.org/jira/browse/FLINK-34270 Project: Flink Issue Type: Sub-task Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34239) Introduce a deep copy method of SerializerConfig for merging with Table configs in org.apache.flink.table.catalog.DataTypeFactoryImpl
Zhanghao Chen created FLINK-34239: - Summary: Introduce a deep copy method of SerializerConfig for merging with Table configs in org.apache.flink.table.catalog.DataTypeFactoryImpl Key: FLINK-34239 URL: https://issues.apache.org/jira/browse/FLINK-34239 Project: Flink Issue Type: Sub-task Components: API / Core Affects Versions: 1.19.0 Reporter: Zhanghao Chen *Problem* Currently, org.apache.flink.table.catalog.DataTypeFactoryImpl#createSerializerExecutionConfig will create a deep-copy of the SerializerConfig and merge Table config into it. However, the deep copy is done by manully calling the getter and setter methods of SerializerConfig, and is prone to human errors, e.g. missing copying a newly added field in SerializerConfig. *Proposal* Introduce a deep copy method for SerializerConfig and replace the curr impl in org.apache.flink.table.catalog.DataTypeFactoryImpl#createSerializerExecutionConfig. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34230) Update flink-docs README: add -Pskip-webui-build to the config doc generation command
Zhanghao Chen created FLINK-34230: - Summary: Update flink-docs README: add -Pskip-webui-build to the config doc generation command Key: FLINK-34230 URL: https://issues.apache.org/jira/browse/FLINK-34230 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34217) Update Serialization-related doc with the new way of configuration
Zhanghao Chen created FLINK-34217: - Summary: Update Serialization-related doc with the new way of configuration Key: FLINK-34217 URL: https://issues.apache.org/jira/browse/FLINK-34217 Project: Flink Issue Type: Sub-task Components: Documentation Affects Versions: 1.19.0 Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34125) Flink 2.0: Remove deprecated serialization config methods and options
Zhanghao Chen created FLINK-34125: - Summary: Flink 2.0: Remove deprecated serialization config methods and options Key: FLINK-34125 URL: https://issues.apache.org/jira/browse/FLINK-34125 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System, Runtime / Configuration Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34124) Flink 2.0: Disable Kyro by default
Zhanghao Chen created FLINK-34124: - Summary: Flink 2.0: Disable Kyro by default Key: FLINK-34124 URL: https://issues.apache.org/jira/browse/FLINK-34124 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System, Runtime / Configuration Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34123) Introduce built-in serializers for common composited data types
Zhanghao Chen created FLINK-34123: - Summary: Introduce built-in serializers for common composited data types Key: FLINK-34123 URL: https://issues.apache.org/jira/browse/FLINK-34123 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34122) Deprecate old serialization config methods and options
Zhanghao Chen created FLINK-34122: - Summary: Deprecate old serialization config methods and options Key: FLINK-34122 URL: https://issues.apache.org/jira/browse/FLINK-34122 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System, Runtime / Configuration Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34121) Introduce pipeline.force-kryo-avro to control whether to force registration of Avro serializer with Kryo
Zhanghao Chen created FLINK-34121: - Summary: Introduce pipeline.force-kryo-avro to control whether to force registration of Avro serializer with Kryo Key: FLINK-34121 URL: https://issues.apache.org/jira/browse/FLINK-34121 Project: Flink Issue Type: Sub-task Components: API / Type Serialization System, Runtime / Configuration Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34120) Introduce unified serialization config option for all Kryo, POJO and customized serializers
Zhanghao Chen created FLINK-34120: - Summary: Introduce unified serialization config option for all Kryo, POJO and customized serializers Key: FLINK-34120 URL: https://issues.apache.org/jira/browse/FLINK-34120 Project: Flink Issue Type: New Feature Components: API / Type Serialization System, Runtime / Configuration Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34103) AsyncIO example failed to run as DataGen Connector is not bundled
Zhanghao Chen created FLINK-34103: - Summary: AsyncIO example failed to run as DataGen Connector is not bundled Key: FLINK-34103 URL: https://issues.apache.org/jira/browse/FLINK-34103 Project: Flink Issue Type: Bug Components: Examples Affects Versions: 1.18.0 Reporter: Zhanghao Chen >From the comments of >[FLINK-32821|https://issues.apache.org/jira/browse/FLINK-32821]: root@73186f600374:/opt/flink# bin/flink run /volume/flink-examples-streaming-1.18.0-AsyncIO.jar WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports WARNING: Unknown module: jdk.compiler specified to --add-exports java.lang.NoClassDefFoundError: org/apache/flink/connector/datagen/source/DataGeneratorSource at org.apache.flink.streaming.examples.async.AsyncIOExample.main(AsyncIOExample.java:82) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.datagen.source.DataGeneratorSource at java.base/java.net.URLClassLoader.findClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 15 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling
Zhanghao Chen created FLINK-33977: - Summary: Adaptive scheduler may not minimize the number of TMs during downscaling Key: FLINK-33977 URL: https://issues.apache.org/jira/browse/FLINK-33977 Project: Flink Issue Type: Improvement Affects Versions: 1.18.0 Reporter: Zhanghao Chen Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing groups. Currently, there're two implementations of SlotAssigner available: the DefaultSlotAssigner that treats all slots and slot sharing groups equally and the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based on the number of local key groups to utilize local state recovery. The scheduler will use the DefaultSlotAssigner when no key group assignment info is available and use the StateLocalitySlotAssigner otherwise. However, none of the SlotAssigner targets at minimizing the number of TMs, which may produce suboptimal slot assignment under the Application Mode. For example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is downscaled through the FLIP-291 API to have 4 slot sharing groups instead, the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 free slots. For end-users, this implies an ineffective downscaling as the total cluster resources are not reduced. We should take minimizing number of TMs into consideration as well. A possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: when the number of free slots exceeds need, sort all the TMs by a score summing from the allocation scores of all slots on it, remove slots from the excessive TMs with the lowest score and proceed the remaining slot assignment.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change
Zhanghao Chen created FLINK-33962: - Summary: Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change Key: FLINK-33962 URL: https://issues.apache.org/jira/browse/FLINK-33962 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Zhanghao Chen *Background* Flink restores opeartor state from snapshots based on matching the operatorIDs. Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID generation when no user-set uid exist. The generated OperatorID is deterministic with respect to: * node-local properties (the traverse ID in the BFS for the DAG) * chained output nodes * input nodes hashes *Problem* The chaining behavior will affect state compatibility, as the generation of the OperatorID of an Op is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state imcompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/joining for performance tuning. *Proposal* Introduce ** {{StreamGraphHasherV3}} that is agnostic to the chaining behavior of operators, which effectively just removes L227-235 of [flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java at master · apache/flink (github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java]. This will not hurt the deteministicity of the ID generation across job submission as long as the stream graph topology doesn't change, and since new versions of Flink have already adopted pure operator-level state recovery, this will not break state recovery across job submission as long as both submissions use the same hasher. This will, however, breaks cross-version state compatibility. So we can introduce a new option to enable using HasherV3 in v1.19 and consider making it the default hasher in v2.0. Looking forward to suggestions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33940) Update Update the auto-derivation rule of max parallelism for enlarged upscaling space
Zhanghao Chen created FLINK-33940: - Summary: Update Update the auto-derivation rule of max parallelism for enlarged upscaling space Key: FLINK-33940 URL: https://issues.apache.org/jira/browse/FLINK-33940 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Zhanghao Chen *Background* The choice of the max parallelism of an stateful operator is important as it limits the upper bound of the parallelism of the opeartor while it can also add extra overhead when being set too large. Currently, the max parallelism of an opeartor is either fixed to a value specified by API core / pipeline option or auto-derived with the following rules: `min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)` *Problem* Recently, the elasticity of Flink jobs is becoming more and more valued by users. The current auto-derived max parallelism was introduced a time time ago and only allows the operator parallelism to be roughly doubled, which is not desired for elasticity. Setting an max parallelism manually may not be desired as well: users may not have the sufficient expertise to select a good max-parallelism value. *Proposal* Update the auto-derivation rule of max parallelism to derive larger max parallelism for better elasticity experience out of the box. A candidate is as follows: `min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 32767)` Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33891) Remove the obsolete SingleJobGraphStore
Zhanghao Chen created FLINK-33891: - Summary: Remove the obsolete SingleJobGraphStore Key: FLINK-33891 URL: https://issues.apache.org/jira/browse/FLINK-33891 Project: Flink Issue Type: Technical Debt Components: Runtime / Coordination Reporter: Zhanghao Chen SingleJobGraphStore was introduced a long time ago in FLIP-6. It is only used in a test case in DefaultDispatcherRunnerITCase# leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader. We can replace it with TestingJobGraphStore there and then safely remove the class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33682) Reuse source operator input records/bytes metrics for SourceOperatorStreamTask
Zhanghao Chen created FLINK-33682: - Summary: Reuse source operator input records/bytes metrics for SourceOperatorStreamTask Key: FLINK-33682 URL: https://issues.apache.org/jira/browse/FLINK-33682 Project: Flink Issue Type: Sub-task Components: Runtime / Metrics Reporter: Zhanghao Chen For SourceOperatorStreamTask, source opeartor is the head operator that takes input. We can directly reuse source operator input records/bytes metrics for it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33681) Display source/sink numRecordsIn/Out & numBytesIn/Out on UI
Zhanghao Chen created FLINK-33681: - Summary: Display source/sink numRecordsIn/Out & numBytesIn/Out on UI Key: FLINK-33681 URL: https://issues.apache.org/jira/browse/FLINK-33681 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Zhanghao Chen Attachments: image-2023-11-29-13-26-15-176.png Currently, the numRecordsIn & numBytesIn metrics for sources and the numRecordsOut & numBytesOut metrics for sinks are always 0 on the Flink web dashboard. [FLINK-11576|https://issues.apache.org/jira/browse/FLINK-11576] brings us these metrics on the opeartor level, but it does not integrate them on the task level. On the other hand, the summay metrics on the job overview page is based on the task level I/O metrics. As a result, even though new connectors supporting FLIP-33 metrics will report operator-level I/O metrics, we still cannot see the metrics on dashboard. This ticket serves as an umbrella issue to integrate standard source/sink I/O metrics with the corresponding task I/O metrics. !image-2023-11-29-13-26-15-176.png|width=590,height=252! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33265) Support source parallelism setting for Kafka connector
Zhanghao Chen created FLINK-33265: - Summary: Support source parallelism setting for Kafka connector Key: FLINK-33265 URL: https://issues.apache.org/jira/browse/FLINK-33265 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33264) Support source parallelism setting for DataGen connector
Zhanghao Chen created FLINK-33264: - Summary: Support source parallelism setting for DataGen connector Key: FLINK-33264 URL: https://issues.apache.org/jira/browse/FLINK-33264 Project: Flink Issue Type: Sub-task Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33263) Implement ParallelismProvider for sources in Blink planner
Zhanghao Chen created FLINK-33263: - Summary: Implement ParallelismProvider for sources in Blink planner Key: FLINK-33263 URL: https://issues.apache.org/jira/browse/FLINK-33263 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33262) Extend source provider interfaces with the new parallelism provider interface
Zhanghao Chen created FLINK-33262: - Summary: Extend source provider interfaces with the new parallelism provider interface Key: FLINK-33262 URL: https://issues.apache.org/jira/browse/FLINK-33262 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33261) FLIP-367: Support Setting Parallelism for Table/SQL Sources
Zhanghao Chen created FLINK-33261: - Summary: FLIP-367: Support Setting Parallelism for Table/SQL Sources Key: FLINK-33261 URL: https://issues.apache.org/jira/browse/FLINK-33261 Project: Flink Issue Type: New Feature Components: Table SQL / API Reporter: Zhanghao Chen Umbrella issue for [FLIP-367: Support Setting Parallelism for Table/SQL Sources - Apache Flink - Apache Software Foundation|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33240) Generate docs for deprecated options as well
Zhanghao Chen created FLINK-33240: - Summary: Generate docs for deprecated options as well Key: FLINK-33240 URL: https://issues.apache.org/jira/browse/FLINK-33240 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Zhanghao Chen Fix For: 1.19.0 Currently, Flink will skip doc generation for deprecated options (See {{{}ConfigOptionsDocGenerator#{}}}{{{}shouldBeDocumented{}}}). As a result, the deprecated options can no longer be found in the new version of Flink document. This might confuse users upgrading from an older version of Flink and they have to either carefully read the release notes or check the source code for upgrading guidance on deprecated options. I suggest generating doc for deprecated options as well, and we should scan through the code to make sure that proper upgrading guidance is provided for the deprecated options. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option
Zhanghao Chen created FLINK-33236: - Summary: Remove the unused high-availability.zookeeper.path.running-registry option Key: FLINK-33236 URL: https://issues.apache.org/jira/browse/FLINK-33236 Project: Flink Issue Type: Technical Debt Components: Runtime / Configuration Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 The running registry subcomponent of Flink HA has been removed in [FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the "high-availability.zookeeper.path.running-registry" option is of no use after that. We should remove the option and regenerate the config doc to remove the relevant descriptions to avoid user's confusion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33221) Add config options for administrator JVM options
Zhanghao Chen created FLINK-33221: - Summary: Add config options for administrator JVM options Key: FLINK-33221 URL: https://issues.apache.org/jira/browse/FLINK-33221 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Zhanghao Chen We encounter similar issues described in SPARK-23472. Users may need to add JVM options to their Flink applications (e.g. to tune GC options). They typically use {{env.java.opts.x}} series of options to do so. We also have a set of administrator JVM options to apply by default, e.g. to enable GC log, tune GC options, etc. Both use cases will need to set the same series of options and will clobber one another. In the past, we generated and pretended to the administrator JVM options in the Java code for generating the starting command for JM/TM. However, this has been proven to be difficult to maintain. Therefore, I propose to also add a set of default JVM options for administrator use that prepends the user-set extra JVM options. We can mark the existing {{env.java.opts.x}} series as user-set extra JVM options and add a set of new {{env.java.opts.x.default}} options for administrator use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33205) Replace Akka with Pekko in the description of "pekko.ssl.enabled"
Zhanghao Chen created FLINK-33205: - Summary: Replace Akka with Pekko in the description of "pekko.ssl.enabled" Key: FLINK-33205 URL: https://issues.apache.org/jira/browse/FLINK-33205 Project: Flink Issue Type: Technical Debt Components: Runtime / Configuration Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33204) Add description for missing options in the all jobmanager/taskmanager options section in document
Zhanghao Chen created FLINK-33204: - Summary: Add description for missing options in the all jobmanager/taskmanager options section in document Key: FLINK-33204 URL: https://issues.apache.org/jira/browse/FLINK-33204 Project: Flink Issue Type: Technical Debt Components: Runtime / Configuration Affects Versions: 1.17.0, 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 There are 4 options which are excluded from the all jobmanager/taskmanager options section in the configuration document: # taskmanager.bind-host # taskmanager.rpc.bind-port # jobmanager.bind-host # jobmanager.rpc.bind-port We should add them to the document under the all jobmanager/taskmanager options section for completeness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33166) Support setting root logger level by config
Zhanghao Chen created FLINK-33166: - Summary: Support setting root logger level by config Key: FLINK-33166 URL: https://issues.apache.org/jira/browse/FLINK-33166 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Zhanghao Chen Users currently cannot change logging level by config and have to modify the cumbersome logger configuration file manually. We'd better provide a shortcut and support setting root logger level by config. There're a number configs already to set logger configurations, like {{env.log.dir}} for logging dir, {{env.log.max}} for max number of old logging file to save. We can name the new config {{{}env.log.level{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33148) Update Web UI to adopt the new "endpoint" field in REST API
Zhanghao Chen created FLINK-33148: - Summary: Update Web UI to adopt the new "endpoint" field in REST API Key: FLINK-33148 URL: https://issues.apache.org/jira/browse/FLINK-33148 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33147) Introduce a new "endpoint" field in REST API to represent TaskManager endpoint (host + port) and deprecate the "host" field
Zhanghao Chen created FLINK-33147: - Summary: Introduce a new "endpoint" field in REST API to represent TaskManager endpoint (host + port) and deprecate the "host" field Key: FLINK-33147 URL: https://issues.apache.org/jira/browse/FLINK-33147 Project: Flink Issue Type: Sub-task Components: Runtime / REST Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33146) FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI
Zhanghao Chen created FLINK-33146: - Summary: FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI Key: FLINK-33146 URL: https://issues.apache.org/jira/browse/FLINK-33146 Project: Flink Issue Type: Improvement Components: Runtime / REST, Runtime / Web Frontend Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 Umbrella ticket for [FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI|https://cwiki.apache.org/confluence/display/FLINK/FLIP-363%3A+Unify+the+Representation+of+TaskManager+Location+in+REST+API+and+Web+UI]. This is a continuation of [FLINK-25371] Include data port as part of the host info for subtask detail panel on Web UI - ASF JIRA (apache.org). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33123) Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler and
Zhanghao Chen created FLINK-33123: - Summary: Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler and Key: FLINK-33123 URL: https://issues.apache.org/jira/browse/FLINK-33123 Project: Flink Issue Type: Bug Components: Autoscaler, Runtime / Coordination Affects Versions: 1.17.0, 1.18.0 Reporter: Zhanghao Chen *Background* https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is wrong when the parallelism is changed for a vertex with a FORWARD edge, which is used by both the autoscaler and adaptive scheduler where one can change the vertex parallelism dynamically. Fix is applied to dynamically replace partitioner from FORWARD to REBLANCE on task deployment in {{{}StreamTask{}}}: {{private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(}} {{ Environment environment, NonChainedOutput streamOutput) {}} {{ Environment environment, NonChainedOutput streamOutput, int outputIndex) {}} {{ if (streamOutput.getPartitioner() instanceof ForwardPartitioner}} {{ && streamOutput.getConsumerParallelism()}} {{ && environment.getWriter(outputIndex).getNumberOfSubpartitions()}} {{ != environment.getTaskInfo().getNumberOfParallelSubtasks()) {}} {{ LOG.debug(}} {{ "Replacing forward partitioner with rebalance for {}",}} {{ environment.getTaskInfo().getTaskNameWithSubtasks());}} {{ streamOutput.setPartitioner(new RebalancePartitioner<>());}} {{ }}} {{ }}} *Problem* Unfortunately, the fix is still buggy in two aspects: # The connections between upstream and downstream tasks are determined by the distribution type of the partitioner when generating execution graph on the JM side. When the edge is FORWARD, the distribution type is POINTWISE, and Flink will try to evenly distribute subpartitions to all downstream tasks. If one want to change it to REBALANCE, the distribution type has to be changed to ALL_TO_ALL to make all-to-all connections between upstream and downstream tasks. However, the fix did not change the distribution type which makes the network connections be set up in a wrong way. # The FOWARD partitioner will be replaced if environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the task parallelism. However, the number of subpartitions here equals to the number of downstream tasks of this particular task, which is also determined by the distribution type of the partitioner when generating execution graph on the JM side. When ceil(downstream task parallelism / upstream task parallelism) = upstream task parallelism, we will have the number of subpartitions = task parallelism. In fact, for a normal job with a FORWARD edge without any autoscaling action, you will find that the partitioner is changed to REBALANCE internally as the number of subpartitions always equals to 1 in this case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32983) Support setting env.java.opts.all & env.java.opts.cli configs via dynamic properties on the CLI side
Zhanghao Chen created FLINK-32983: - Summary: Support setting env.java.opts.all & env.java.opts.cli configs via dynamic properties on the CLI side Key: FLINK-32983 URL: https://issues.apache.org/jira/browse/FLINK-32983 Project: Flink Issue Type: Improvement Components: Command Line Client, Deployment / Scripts Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 *Problem* The following configs are supposed to be supported: |h5. env.java.opts.all|(none)|String|Java options to start the JVM of all Flink processes with.| |h5. env.java.opts.client|(none)|String|Java options to start the JVM of the Flink Client with.| However, the two configs only takes effect on the Client side when they are set in the flink-conf files. In other words, configs set via -D or-yD on the CLI will not take effect, which is counter-intuitive and makes configuration less flexible. *Proposal* Add logic to parse configs set via -D or-yD in config.sh and make them has a higher precedence over configs set in flink-conf.yaml for env.java.opts.all & env.java.opts.client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32980) Support env.java.opts.all & env.java.opts.cli config for starting Session clusters
Zhanghao Chen created FLINK-32980: - Summary: Support env.java.opts.all & env.java.opts.cli config for starting Session clusters Key: FLINK-32980 URL: https://issues.apache.org/jira/browse/FLINK-32980 Project: Flink Issue Type: Improvement Components: Command Line Client, Deployment / Scripts Affects Versions: 1.18.0 Reporter: Zhanghao Chen Fix For: 1.19.0 *Problem* The following configs are supposed to be supported: |h5. env.java.opts.all|(none)|String|Java options to start the JVM of all Flink processes with.| |h5. env.java.opts.client|(none)|String|Java options to start the JVM of the Flink Client with.| However, the two configs do not take effect for starting Flink session clusters using kubernetes-session.sh and yarn-session.sh. This can lead to problems in complex production envs. For example, in my company, some nodes are IPv6-only, and the connection between Flink client and K8s/YARN control plane is via a domain name whose backend is on IPv4/v6 dual stack, and the JVM arg -Djava.net.preferIPv6Addresses=true needs to be set to make Java connect to IPv6 addresses in favor of IPv4 ones otherwise the K8s/YARN control plane is inaccessible. *Proposal* The fix is straight-forward, simply apply the following changes to the session scripts: ` # Add Client-specific JVM options FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}" exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS xxx ` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32872) Add option to control the default partitioner when the parallelism of upstream and downstream operator does not match
Zhanghao Chen created FLINK-32872: - Summary: Add option to control the default partitioner when the parallelism of upstream and downstream operator does not match Key: FLINK-32872 URL: https://issues.apache.org/jira/browse/FLINK-32872 Project: Flink Issue Type: New Feature Components: Runtime / Configuration Affects Versions: 1.17.0 Reporter: Zhanghao Chen *Problem* Currently, when the no partitioner is specified, FORWARD partitioner is used if the parallelism of upstream and downstream operator matches, REBALANCE partitioner used otherwise. However, this behavior is not configurable and can be undesirable in certain cases: # REBALANCE partitioner will create an all-to-all connection between upstream and downstream operators and consume a lot of extra CPU and memory resources when the parallelism is high in pipelining mode and RESCALE partitioner is desirable in this case. # For Flink SQL jobs, users cannot specify the partitioner directly so far. And for DataStream jobs, users may not want to explicitly set the partitioner everywhere. *Proposal* Add an option to control the default partitioner when the parallelism of upstream and downstream operator does not match. The option can have the name "pipeline.default-partioner-with-unmatched-parallelism" with REBALANCE as the default value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32868) Document the need to backport FLINK-30213 for using autoscaler with older version Flinks
Zhanghao Chen created FLINK-32868: - Summary: Document the need to backport FLINK-30213 for using autoscaler with older version Flinks Key: FLINK-32868 URL: https://issues.apache.org/jira/browse/FLINK-32868 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Zhanghao Chen The current Autoscaler doc states on job requirements as the following: Job requirements: * The autoscaler currently only works with the latest [Flink 1.17|https://hub.docker.com/_/flink] or after backporting the following fixes to your 1.15/1.16 Flink image ** [Job vertex parallelism overrides|https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9] (must have) ** [Support timespan for busyTime metrics|https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35] (good to have) However, https://issues.apache.org/jira/browse/FLINK-30213 is also crucial and need to be backported to 1.15/1.16 to enable autoscaling. We should add it to the doc as well, and marked as must have. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32822) Add connector option to control whether to enable auto-commit of offsets when checkpoints is enabled
Zhanghao Chen created FLINK-32822: - Summary: Add connector option to control whether to enable auto-commit of offsets when checkpoints is enabled Key: FLINK-32822 URL: https://issues.apache.org/jira/browse/FLINK-32822 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Zhanghao Chen When checkpointing is enabled, Flink Kafka connector commits the current consuming offset when checkpoints are *completed* although ** Kafka source does *NOT* rely on committed offsets for fault tolerance. When the checkpoint interval is long, the lag curve will behave in a zig-zag way: the lag will keep increasing, and suddenly drops on a complete checkpoint. It have led to some confusion for users as in [https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease] and may also affect external monitoring for setting up alarms (you'll have to set up with a high threshold due to the non-realtime commit of offsets) and autoscaling (the algorithm would need to pay extra effort to distinguish whether the backlog is actually growing or just because the checkpoint is not completed yet). Therefore, I think it is worthwhile to add an option to enable auto-commit of offsets when checkpoints is enabled. For DataStream API, it will be adding a configuration method. For Table API, it will be adding a new connector option which wires to the DataStream API configuration underneath. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32821) Streaming examples failed to execute due to error in packaging
Zhanghao Chen created FLINK-32821: - Summary: Streaming examples failed to execute due to error in packaging Key: FLINK-32821 URL: https://issues.apache.org/jira/browse/FLINK-32821 Project: Flink Issue Type: Improvement Components: Examples Affects Versions: 1.18.0 Reporter: Zhanghao Chen 5 out of the 7 streaming examples failed to run: * Iteration & SessionWindowing & SocketWindowWordCount & WindowJoin failed to run due to java.lang.NoClassDefFoundError: org/apache/flink/streaming/examples/utils/ParameterTool * TopSpeedWindowing failed to run due to: Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.datagen.source.GeneratorFunction The NoClassDefFoundError with ParameterTool is introduced by [FLINK-32558] Properly deprecate DataSet API - ASF JIRA (apache.org), and we'd better resolve [FLINK-32820] ParameterTool is mistakenly marked as deprecated - ASF JIRA (apache.org) first before we come to a fix for this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32820) ParameterTool is mistakenly marked as deprecated
Zhanghao Chen created FLINK-32820: - Summary: ParameterTool is mistakenly marked as deprecated Key: FLINK-32820 URL: https://issues.apache.org/jira/browse/FLINK-32820 Project: Flink Issue Type: Improvement Components: API / DataSet, API / DataStream Affects Versions: 1.18.0 Reporter: Zhanghao Chen ParameterTool and AbstractParameterTool in pacakge flink-java is mistakenly marked as deprecated in [FLINK-32558] Properly deprecate DataSet API - ASF JIRA (apache.org). They are widely used for handling application parameters and is also listed in the Flink user doc: [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/.|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/] Also, they are not directly related to Dataset API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32127) Source busy time is inaccurate in many cases
Zhanghao Chen created FLINK-32127: - Summary: Source busy time is inaccurate in many cases Key: FLINK-32127 URL: https://issues.apache.org/jira/browse/FLINK-32127 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Zhanghao Chen We found that source busy time is inaccurate in many cases. The reason is that sources are usu. multi-threaded (Kafka and RocketMq for example), there is a fetcher thread fetching data from data source, and a consumer thread deserializes data with an blocking queue in between. A source is considered # *idle* if the consumer is blocked by fetching data from the queue # *backpressured* if the consumer is blocked by writing data to downstream operators # *busy* otherwise However, this means that if the bottleneck is on the fetcher side, the consumer will be often blocked by fetching data from the queue, the source idle time would be high, but in fact it is busy and consumes a lot of CPU. In some of our jobs, the source max busy time is only ~600 ms while it is actually reaching the limit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32124) Add option to enable partition alignment for sources
Zhanghao Chen created FLINK-32124: - Summary: Add option to enable partition alignment for sources Key: FLINK-32124 URL: https://issues.apache.org/jira/browse/FLINK-32124 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Zhanghao Chen Currently, autoscaler did not consider balancing partitions among source tasks. In our production env, partition skew has proven to be a severe problem for many jobs. Especially in a job topology with all forward or rescale shuffles, partition skew on the source side can further lead to data imbalance in later operators. We should add an option to enable partition alignment for sources for that, but making it disabled by default as this has a side effect in that partition usu. has limited factors and enabling alignment will greatly limit our scaling choices. Also, if data among partitions are imbalanced in the first place, partition alignment won't help as well (this is not a common case inside our company though). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32037) The edge on Web UI is wrong after parallelism changes via parallelism overrides or AdaptiveScheduler
Zhanghao Chen created FLINK-32037: - Summary: The edge on Web UI is wrong after parallelism changes via parallelism overrides or AdaptiveScheduler Key: FLINK-32037 URL: https://issues.apache.org/jira/browse/FLINK-32037 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / REST Affects Versions: 1.17.0 Reporter: Zhanghao Chen *Background* After FLINK-30213, in case of parallelism changes to the JobGraph, as done via the AdaptiveScheduler or through providing JobVertexId overrides in PipelineOptions#PARALLELISM_OVERRIDES, when the consumer parallelism doesn't match the local parallelism, the original ForwardPartitioner will be replaced with the RebalancePartitioner. *Problem* Although the actual partitioner changes underneath, the ship strategy seen on the Web UI is still FORWARD. This is because the fix patch applies when we init StreamTask, and the job graph is not touched. Web UI uses the JSON plan generated from the job graph for display, and the ship strategy is get by JobEdge#getShipStrategyName. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31991) Update Autoscaler doc to reflect the changes brought by the new source scaling logic
Zhanghao Chen created FLINK-31991: - Summary: Update Autoscaler doc to reflect the changes brought by the new source scaling logic Key: FLINK-31991 URL: https://issues.apache.org/jira/browse/FLINK-31991 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Zhanghao Chen Attachments: image-2023-05-03-20-16-33-704.png The current statements on job requirements are outdated: ??- All sources must use the new Source API (most common connectors already do)?? ??- Source scaling requires sources to expose the standardized connector metrics for accessing backlog information (source scaling can be disabled)?? The Autoscaler doc needs to be updated to reflect the changes brought by the new source scaling logic ([FLINK-31326|[FLINK-31326] Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0 - ASF JIRA (apache.org)]). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31936) Support setting scale up max factor
Zhanghao Chen created FLINK-31936: - Summary: Support setting scale up max factor Key: FLINK-31936 URL: https://issues.apache.org/jira/browse/FLINK-31936 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Zhanghao Chen Currently, only scale down max factor is supported to be configured. We should also add a config for scale up max factor as well. In many cases, a job's performance won't improve after scaling up due to external bottlenecks. Although we can detect ineffective scaling up would block further scaling, but it already hurts if we scale too much in a single step which may even burn out external services. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31827) Incorrect estimation of the target data rate of a vertex when only a subset of its upstream vertex's output is consumed
Zhanghao Chen created FLINK-31827: - Summary: Incorrect estimation of the target data rate of a vertex when only a subset of its upstream vertex's output is consumed Key: FLINK-31827 URL: https://issues.apache.org/jira/browse/FLINK-31827 Project: Flink Issue Type: Bug Components: Autoscaler Reporter: Zhanghao Chen Attachments: image-2023-04-17-23-37-35-280.png Currently, the target data rate of a vertex = SUM(target data rate * input/output ratio) for all of its upstream vertices. This assumes that all output records of an upstream vertex is consumed by the downstream vertex. However, it does not always hold. Consider the following job plan generated by a Flink SQL job. The middle vertex contains multiple chained Calc(select xx) operators, each connecting to a separate downstream sink tasks. As a result, each sink task only consumes a sub-portion of the middle vertex's output. To fix it, we need operator level edge info to infer the upstream-downstream relationship as well as operator level output metrics. The metrics part is easy but AFAIK, there's no way to get the operator level edge info from the Flink REST API yet. !image-2023-04-17-23-37-35-280.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31826) Incorrect estimation of the target data rate of a vertex when only a subset of its upstream vertex's output is consumed
Zhanghao Chen created FLINK-31826: - Summary: Incorrect estimation of the target data rate of a vertex when only a subset of its upstream vertex's output is consumed Key: FLINK-31826 URL: https://issues.apache.org/jira/browse/FLINK-31826 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: Zhanghao Chen Attachments: LHL7VKOG4B.jpg Currently, a vertex's target data rate = the sum of its upstream vertex's target data rate * input/output ratio. This assumes that all of the upstream vertex output goes into the current vertex. However, it does not always hold. Consider the following job plan generated by a Flink SQL job. The vertex in the middle has multiple Calc(select xx) operators chained, each connects to a separate downstream tasks. The total num_rec_out_rate of the middle vertex = SUM num_rec_in_rate of its downstream tasks. To fix this problem, we need operator level output metrics and edge info. The operator level metrics part is easy, but AFAIK, there's no way to get the operator level edge info from the current Flink REST APIs. !LHL7VKOG4B.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31769) Add percentiles to aggregated metrics
Zhanghao Chen created FLINK-31769: - Summary: Add percentiles to aggregated metrics Key: FLINK-31769 URL: https://issues.apache.org/jira/browse/FLINK-31769 Project: Flink Issue Type: Improvement Components: Autoscaler, Runtime / Metrics Reporter: Zhanghao Chen Attachments: image-2023-04-11-15-11-51-471.png *Background* Currently only min/avg/max of metrics are exposed via REST API. Flink Autoscaler relies on these aggregated metrics to make predictions, and the type of aggregation plays an import role. [FLINK-30652] Use max busytime instead of average to compute true processing rate - ASF JIRA (apache.org) suggests that using max aggregator instead of avg of busy time can handle data skew more robustly. However, we found that for large-scale jobs, using max aggregation may be too sensitive. As a result, the true processing rate is underestimated with severe turbulence. The graph below is the true processing rate estimated with different aggregators of a real production data transmission job with a parallelism of 750. !image-2023-04-11-15-11-51-471.png! *Proposal* Add percentiles (p50, p90, p99) to aggregated metrics. Apache common maths can be used for computing that. A follow up would be making Flink autoscaler make use of the new aggregators. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31358) Display consumed split/partition/queue info on Web UI
Zhanghao Chen created FLINK-31358: - Summary: Display consumed split/partition/queue info on Web UI Key: FLINK-31358 URL: https://issues.apache.org/jira/browse/FLINK-31358 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Zhanghao Chen Many data sources have the concept of "split", which is a partition of the whole data (e.g. partition in Kafka, queue in RocketMQ), and each Flink source task is allocated with a subset of splits to consume. When a job is lagging on only a few splits, it would be useful for determining whether it is a data source issue or a Flink issue if users can view which source task consumes which split on Web UI. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31245) Adaptive scheduler does not reset the state of GlobalAggregateManager when rescaling
Zhanghao Chen created FLINK-31245: - Summary: Adaptive scheduler does not reset the state of GlobalAggregateManager when rescaling Key: FLINK-31245 URL: https://issues.apache.org/jira/browse/FLINK-31245 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.1 Reporter: Zhanghao Chen Fix For: 1.18.0 *Problem* GlobalAggregateManager is used to share state amongst parallel tasks in a job and thus coordinate their execution. It maintains a state (the _accumulators_ field in JobMaster) in JM memory. The accumulator state content is defined in user code, in my company, a user stores task parallelism in the accumulator, assuming task parallelism never changes. However, this assumption is broken when using adaptive scheduler. *Possible Solutions* # Mark GlobalAggregateManager as deprecated. It seems that operator coordinator can completely replace GlobalAggregateManager and is a more elegent solution. Therefore, it is fine to deprecate GlobalAggregateManager and leave this issue there. It that's the case, we can open another ticket for doing that. # If we decide to continue supporting GlobalAggregateManager, then we need to reset the state when rescaling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30829) Make the backpressure tab could be sort by the backpressure level
Zhanghao Chen created FLINK-30829: - Summary: Make the backpressure tab could be sort by the backpressure level Key: FLINK-30829 URL: https://issues.apache.org/jira/browse/FLINK-30829 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.17 Reporter: Zhanghao Chen [FLINK-29998|https://issues.apache.org/jira/browse/FLINK-29998] enables user to sort the backpressure tab to see which task is busiest. Another common scenario for backpressure analysis is to find which tasks are backpressured. We should add support to sort the backpressure tab by backpressure level as well. h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30513) HA storage dir leaks on cluster termination
Zhanghao Chen created FLINK-30513: - Summary: HA storage dir leaks on cluster termination Key: FLINK-30513 URL: https://issues.apache.org/jira/browse/FLINK-30513 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.16.0, 1.15.0 Reporter: Zhanghao Chen Attachments: image-2022-12-27-21-32-17-510.png *Problem* We found that HA storage dir leaks on cluster termination for a Flink job with HA enabled. The following picture shows the HA storage dir (here on HDFS) of the cluster czh-flink-test-offline (of application mode) after canelling the job with flink-cancel. We are left with an empty dir, and too many empty dirs will greatly hurt the stability of HDFS NameNode! !image-2022-12-27-21-32-17-510.png|width=582,height=158! Furthermore, in case the user choose to retain the checkpoints on job termination, we will have the completedCheckpoints leaked as well. Note that we no longer need the completedCheckpoints files as we'll directly recover retained CPs from the CP data dir. *Root Cause* When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob store, but didn't clean the HA storage dir. *Proposal* Clean up the HA storage dir after cleaning up blob store in AbstractHaServices#closeAndCleanupAllData(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30101) YARN client should
Zhanghao Chen created FLINK-30101: - Summary: YARN client should Key: FLINK-30101 URL: https://issues.apache.org/jira/browse/FLINK-30101 Project: Flink Issue Type: Improvement Components: Client / Job Submission Affects Versions: 1.16.0 Reporter: Zhanghao Chen Fix For: 1.17.0 *Problem* Currently, the procedure of retrieving a Flink on YARN cluster client is as follows (in YarnClusterDescriptor#retrieve method): # Get application report from YARN # Set rest.address & rest.port using the info from application report # Create a new RestClusterClient using the updated configuration, will use client HA serivce to fetch the rest.address & rest.port if HA is enabled Here, we can see that the usage of client HA in step 3 is redundant, as we've already got the rest.address & rest.port from YARN application report. When ZK HA is enabled, this would take ~1.5 s to initialize client HA services and fetch the rest IP & port. 1.5 s can mean a lot for latency-sensitive client operations. In my company, we use Flink client to submit short-running session jobs and e2e latency is critical. The job submission time is around 10 s on average, and 1.5s would mean 15% of time saving. *Proposal* When retrieving a Flink on YARN cluster client, use StandaloneClientHAServices to create RestClusterClient instead as we have pre-fetched rest.address & rest.port from YARN application report. This is also what we did in KubernetesClusterDescriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29615) MetricStore does not remove metrics of nonexistent subtasks when adaptive scheduler lowers job parallelism
Zhanghao Chen created FLINK-29615: - Summary: MetricStore does not remove metrics of nonexistent subtasks when adaptive scheduler lowers job parallelism Key: FLINK-29615 URL: https://issues.apache.org/jira/browse/FLINK-29615 Project: Flink Issue Type: Bug Components: Runtime / Metrics, Runtime / REST Affects Versions: 1.15.0, 1.16.0 Reporter: Zhanghao Chen Fix For: 1.17.0 *Problem* MetricStore does not remove metrics of nonexistent subtasks when adaptive scheduler lowers job parallelism and users will see metrics of nonexistent subtasks on Web UI (e.g. the task backpressure page) or REST API response. *Proposed Solution* Thanks to [FLINK-29132] SubtaskMetricStore causes memory leak. - ASF JIRA (apache.org) & [FLINK-28588] Enhance REST API for Speculative Execution - ASF JIRA (apache.org), Flink will now update current execution attempts when updating metrics. Since the active subtask info is included in the current execution attempt info, we are able to retain active subtasks using the current execution attempt info. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28973) Extending /jars/:jarid/plan API to support setting Flink configs
Zhanghao Chen created FLINK-28973: - Summary: Extending /jars/:jarid/plan API to support setting Flink configs Key: FLINK-28973 URL: https://issues.apache.org/jira/browse/FLINK-28973 Project: Flink Issue Type: Improvement Components: Runtime / REST Reporter: Zhanghao Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27060) Extending /jars/:jarid/run API to
Zhanghao Chen created FLINK-27060: - Summary: Extending /jars/:jarid/run API to Key: FLINK-27060 URL: https://issues.apache.org/jira/browse/FLINK-27060 Project: Flink Issue Type: Improvement Components: Runtime / REST Reporter: Zhanghao Chen *Background* Users want to submit job via Flink REST API instead of Flink CLI which is more heavy-weight in certain scenarios, for example, a lightweight data processing workflow system that has Flink related systems. Currently, the /jars/:jarid/run API ([https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run)] only supports a few selected Flink config options listed in the doc (parallelism、savepoint path and allow non-restored state), which is insufficient for practical use. *Proposed Changes* Extending the /jars/:jarid/run API with an additional request body parameter "configs", which is a map of flink configuration option-value pairs set by users. For backward compatibility, we can retain the existing body parameters like "allowNonRestoredState", and when there's conflicting configurations, let the values set explicitly with existing body parameters take higher precedence over the values set by configs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-27056) "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value
Zhanghao Chen created FLINK-27056: - Summary: "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value Key: FLINK-27056 URL: https://issues.apache.org/jira/browse/FLINK-27056 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.14.4, 1.13.6, 1.12.7, 1.15.0 Reporter: Zhanghao Chen Fix For: 1.16.0 *Background* # pipeline.time-characteristic is the configuration option used to control the time characteristic for all created streams, and has the default value _PROCESSING_TIME_ at the point of writing. However, the configuration option won't take effect unless it is explicitly set by user as we read it into the code by configurtion.getOptional(xx).ifPresent(xx). # The default value of _TIME_CHARACTERISTIC_ has been changed from _PROCESSING_TIME_ to _EVENT_TIME_ in [FLINK-19317] Make EventTime the default StreamTimeCharacteristic - ASF JIRA (apache.org) # _TIME_CHARACTERISTIC_ and the relevant operations that set or uses it have been deprecated in [FLINK-19318] Deprecate timeWindow() operations in DataStream API - ASF JIRA (apache.org) and [FLINK-19319] Deprecate StreamExecutionEnvironment.setStreamTimeCharacteristic() and TimeCharacteristic - ASF JIRA (apache.org) *Proposed Change* # {*}{*}pipeline.time-characteristic should be deprecated, just like other _TIME_CHARACTERISTIC_ related operations as we no longer want user to set this. # pipeline.time-characteristic should have the default value of {_}EVENT_TIME{_}, to reflect the actual default value in system, and avoid misleading users. Additionally, I think all configuration options which only take effect when it is explicitly set by user (aka those read into the system by configurtion.getOptional(xx).ifPresent(xx)), should have no default values. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25372) Add thread dump feature for jobmanager
Zhanghao Chen created FLINK-25372: - Summary: Add thread dump feature for jobmanager Key: FLINK-25372 URL: https://issues.apache.org/jira/browse/FLINK-25372 Project: Flink Issue Type: Sub-task Components: Runtime / REST, Runtime / Web Frontend Reporter: Zhanghao Chen Add thread dump feature for jobmanager in addition to the previous work on TM side: [link FLINK-14816|[FLINK-14816] Add thread dump feature for taskmanager - ASF JIRA (apache.org)]. In OLAP scenario, it will be helpful to get JM thread dump info for debugging. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25371) Include port as part of the host info for subtask detail panel on Web UI
Zhanghao Chen created FLINK-25371: - Summary: Include port as part of the host info for subtask detail panel on Web UI Key: FLINK-25371 URL: https://issues.apache.org/jira/browse/FLINK-25371 Project: Flink Issue Type: Improvement Components: Runtime / REST, Runtime / Web Frontend Reporter: Zhanghao Chen Attachments: image-2021-12-18-17-57-13-674.png, image-2021-12-18-18-11-59-143.png *Problem* Currently, the subtask detail panel on Web UI only displays the hostname of the TaskManager on which the subtask is running. However, the data port of the TM is not displayed, whose missing introduces inconvenience for identifying whether two subtasks are running on the same TM. !image-2021-12-18-17-57-13-674.png! *Proposal* Include port as part of the host info for subtask detail panel on Web UI. !image-2021-12-18-18-11-59-143.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24223) Client should throw exception to warn users when the configurations set by program options conflict with those set by -D
Zhanghao Chen created FLINK-24223: - Summary: Client should throw exception to warn users when the configurations set by program options conflict with those set by -D Key: FLINK-24223 URL: https://issues.apache.org/jira/browse/FLINK-24223 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.13.0, 1.12.0, 1.11.0 Reporter: Zhanghao Chen h2. Problem Currently, program options (e.g. -d, -p) has high precedence over configuration options set by -D or -yD at client side. This may cause confusion for users, especially for those program options without args. For example, if a user sets -Dexecution.attached=false without setting -d (they may not be aware of the existence of this option), they will find that the configuration value does not take effect. h2. Proposal Client should throw exception to warn users when the configurations set by program options conflict with those set by -D. -- This message was sent by Atlassian Jira (v8.3.4#803005)