[jira] [Created] (FLINK-35555) Serializing List with null values throws NPE

2024-06-07 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-3:
-

 Summary: Serializing List with null values throws NPE
 Key: FLINK-3
 URL: https://issues.apache.org/jira/browse/FLINK-3
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.20.0
Reporter: Zhanghao Chen
 Fix For: 1.20.0


FLINK-34123 introduced built-in serialization support for java.util.List, which 
relies on the existing {{ListSerializer}} impl. However, {{ListSerializer}} 
does not allow null values, as it is originally designed for serializing 
{{ListState}} only where null value is explicitly forbidden in the contract.

Directly adding null marker to allow null values will break backwards state 
compatibility, so we'll need to introduce a new List serializer and 
corrsponding TypeInformation that allows null values for serializing user 
objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35458) Add serializer upgrade test for set serializer

2024-05-26 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-35458:
-

 Summary: Add serializer upgrade test for set serializer
 Key: FLINK-35458
 URL: https://issues.apache.org/jira/browse/FLINK-35458
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System
Affects Versions: 1.20.0
Reporter: Zhanghao Chen
 Fix For: 2.0.0


New dedicated serializer for Sets is introduced in 
[FLINK-35068|https://issues.apache.org/jira/browse/FLINK-35068]. Since 
serializer upgrade test requires at least one previous release to test the 
upgrade of set serializers (which does not exist yet), we'll add the upgrade 
test for set serializer after the release of v1.20.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35145) Add timeout for cluster termination

2024-04-17 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-35145:
-

 Summary: Add timeout for cluster termination
 Key: FLINK-35145
 URL: https://issues.apache.org/jira/browse/FLINK-35145
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.20.0
Reporter: Zhanghao Chen
 Fix For: 1.20.0


Currently, cluster termination may be blocked forever as there's no timeout for 
that. For example, for an Application cluster with ZK HA enabled, when ZK 
cluster is down, the cluster will reach termination status, but the termination 
process will be blocked when trying to clean up HA data on ZK. Similar 
phenomenon can be observed when an HDFS/S3 outage occurs.

I propose adding a timeout for the cluster termination process in 
ClusterEntryPoint#
shutDownAsync method. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35068) Introduce built-in serialization support for Set

2024-04-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-35068:
-

 Summary: Introduce built-in serialization support for Set
 Key: FLINK-35068
 URL: https://issues.apache.org/jira/browse/FLINK-35068
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System
Affects Versions: 1.20.0
Reporter: Zhanghao Chen
 Fix For: 1.20.0


Introduce built-in serialization support for {{{}Set{}}}, another common Java 
collection type. We'll need to add a new built-in serializer for it 
({{{}MultiSetTypeInformation{}}} utilizes {{MapSerializer}} underneath, but it 
could be more efficient for common {{{}Set{}}}).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34354) Release Testing: Verify FLINK-34037 Improve Serialization Configuration and Usage in Flink

2024-02-04 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34354:
-

 Summary: Release Testing: Verify FLINK-34037 Improve Serialization 
Configuration and Usage in Flink
 Key: FLINK-34354
 URL: https://issues.apache.org/jira/browse/FLINK-34354
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System
Affects Versions: 1.19.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34328) Release Testing Instructions: Verify FLINK-34037 Improve Serialization Configuration And Usage In Flink

2024-01-31 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34328:
-

 Summary: Release Testing Instructions: Verify FLINK-34037 Improve 
Serialization Configuration And Usage In Flink
 Key: FLINK-34328
 URL: https://issues.apache.org/jira/browse/FLINK-34328
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34270) Update connector developer-facing doc

2024-01-29 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34270:
-

 Summary: Update connector developer-facing doc
 Key: FLINK-34270
 URL: https://issues.apache.org/jira/browse/FLINK-34270
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34239) Introduce a deep copy method of SerializerConfig for merging with Table configs in org.apache.flink.table.catalog.DataTypeFactoryImpl

2024-01-25 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34239:
-

 Summary: Introduce a deep copy method of SerializerConfig for 
merging with Table configs in 
org.apache.flink.table.catalog.DataTypeFactoryImpl 
 Key: FLINK-34239
 URL: https://issues.apache.org/jira/browse/FLINK-34239
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Affects Versions: 1.19.0
Reporter: Zhanghao Chen


*Problem*

Currently, 
org.apache.flink.table.catalog.DataTypeFactoryImpl#createSerializerExecutionConfig
 will create a deep-copy of the SerializerConfig and merge Table config into 
it. However, the deep copy is done by manully calling the getter and setter 
methods of SerializerConfig, and is prone to human errors, e.g. missing copying 
a newly added field in SerializerConfig.

*Proposal*

Introduce a deep copy method for SerializerConfig and replace the curr impl in 
org.apache.flink.table.catalog.DataTypeFactoryImpl#createSerializerExecutionConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34230) Update flink-docs README: add -Pskip-webui-build to the config doc generation command

2024-01-24 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34230:
-

 Summary: Update flink-docs README: add -Pskip-webui-build to the 
config doc generation command
 Key: FLINK-34230
 URL: https://issues.apache.org/jira/browse/FLINK-34230
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34217) Update Serialization-related doc with the new way of configuration

2024-01-23 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34217:
-

 Summary: Update Serialization-related doc with the new way of 
configuration
 Key: FLINK-34217
 URL: https://issues.apache.org/jira/browse/FLINK-34217
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34125) Flink 2.0: Remove deprecated serialization config methods and options

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34125:
-

 Summary: Flink 2.0: Remove deprecated serialization config methods 
and options
 Key: FLINK-34125
 URL: https://issues.apache.org/jira/browse/FLINK-34125
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34124) Flink 2.0: Disable Kyro by default

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34124:
-

 Summary: Flink 2.0: Disable Kyro by default
 Key: FLINK-34124
 URL: https://issues.apache.org/jira/browse/FLINK-34124
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34123) Introduce built-in serializers for common composited data types

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34123:
-

 Summary: Introduce built-in serializers for common composited data 
types
 Key: FLINK-34123
 URL: https://issues.apache.org/jira/browse/FLINK-34123
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34122) Deprecate old serialization config methods and options

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34122:
-

 Summary: Deprecate old serialization config methods and options
 Key: FLINK-34122
 URL: https://issues.apache.org/jira/browse/FLINK-34122
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34121) Introduce pipeline.force-kryo-avro to control whether to force registration of Avro serializer with Kryo

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34121:
-

 Summary: Introduce pipeline.force-kryo-avro to control whether to 
force registration of Avro serializer with Kryo
 Key: FLINK-34121
 URL: https://issues.apache.org/jira/browse/FLINK-34121
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34120) Introduce unified serialization config option for all Kryo, POJO and customized serializers

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34120:
-

 Summary: Introduce unified serialization config option for all 
Kryo, POJO and customized serializers
 Key: FLINK-34120
 URL: https://issues.apache.org/jira/browse/FLINK-34120
 Project: Flink
  Issue Type: New Feature
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34103) AsyncIO example failed to run as DataGen Connector is not bundled

2024-01-15 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34103:
-

 Summary: AsyncIO example failed to run as DataGen Connector is not 
bundled
 Key: FLINK-34103
 URL: https://issues.apache.org/jira/browse/FLINK-34103
 Project: Flink
  Issue Type: Bug
  Components: Examples
Affects Versions: 1.18.0
Reporter: Zhanghao Chen


>From the comments of 
>[FLINK-32821|https://issues.apache.org/jira/browse/FLINK-32821]:
root@73186f600374:/opt/flink# bin/flink run 
/volume/flink-examples-streaming-1.18.0-AsyncIO.jar
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
java.lang.NoClassDefFoundError: 
org/apache/flink/connector/datagen/source/DataGeneratorSource
at 
org.apache.flink.streaming.examples.async.AsyncIOExample.main(AsyncIOExample.java:82)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.datagen.source.DataGeneratorSource
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 15 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling

2024-01-03 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33977:
-

 Summary: Adaptive scheduler may not minimize the number of TMs 
during downscaling
 Key: FLINK-33977
 URL: https://issues.apache.org/jira/browse/FLINK-33977
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.18.0
Reporter: Zhanghao Chen


Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing 
groups. Currently, there're two implementations of SlotAssigner available: the 
DefaultSlotAssigner that treats all slots and slot sharing groups equally and 
the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based on 
the number of local key groups to utilize local state recovery. The scheduler 
will use the DefaultSlotAssigner when no key group assignment info is available 
and use the StateLocalitySlotAssigner otherwise.
 
However, none of the SlotAssigner targets at minimizing the number of TMs, 
which may produce suboptimal slot assignment under the Application Mode. For 
example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is 
downscaled through the FLIP-291 API to have 4 slot sharing groups instead, the 
cluster may still have 2 TMs, one with 1 free slot, and the other with 3 free 
slots. For end-users, this implies an ineffective downscaling as the total 
cluster resources are not reduced.
 
We should take minimizing number of TMs into consideration as well. A possible 
approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: when the 
number of free slots exceeds need, sort all the TMs by a score summing from the 
allocation scores of all slots on it, remove slots from the excessive TMs with 
the lowest score and proceed the remaining slot assignment.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change

2024-01-01 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33962:
-

 Summary: Chaining-agnostic OperatorID generation for improved 
state compatibility on parallelism change
 Key: FLINK-33962
 URL: https://issues.apache.org/jira/browse/FLINK-33962
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Zhanghao Chen


*Background*

Flink restores opeartor state from snapshots based on matching the operatorIDs. 
Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID generation when 
no user-set uid exist. The generated OperatorID is deterministic with respect 
to:
 * node-local properties (the traverse ID in the BFS for the DAG)
 * chained output nodes
 * input nodes hashes

*Problem*

The chaining behavior will affect state compatibility, as the generation of the 
OperatorID of an Op is dependent on its chained output nodes. For example, a 
simple source->sink DAG with source and sink chained together is state 
imcompatible with an otherwise identical DAG with source and sink unchained 
(either because the parallelisms of the two ops are changed to be unequal or 
chaining is disabled). This greatly limits the flexibility to perform 
chain-breaking/joining for performance tuning.

*Proposal*

Introduce ** {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
of operators, which effectively just removes L227-235 of 
[flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 at master · apache/flink 
(github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
 

This will not hurt the deteministicity of the ID generation across job 
submission as long as the stream graph topology doesn't change, and since new 
versions of Flink have already adopted pure operator-level state recovery, this 
will not break state recovery across job submission as long as both submissions 
use the same hasher.

This will, however, breaks cross-version state compatibility. So we can 
introduce a new option to enable using HasherV3 in v1.19 and consider making it 
the default hasher in v2.0.

Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33940) Update Update the auto-derivation rule of max parallelism for enlarged upscaling space

2023-12-25 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33940:
-

 Summary: Update Update the auto-derivation rule of max parallelism 
for enlarged upscaling space
 Key: FLINK-33940
 URL: https://issues.apache.org/jira/browse/FLINK-33940
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Zhanghao Chen


*Background*

The choice of the max parallelism of an stateful operator is important as it 
limits the upper bound of the parallelism of the opeartor while it can also add 
extra overhead when being set too large. Currently, the max parallelism of an 
opeartor is either fixed to a value specified by API core / pipeline option or 
auto-derived with the following rules:

`min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)`

*Problem*

Recently, the elasticity of Flink jobs is becoming more and more valued by 
users. The current auto-derived max parallelism was introduced a time time ago 
and only allows the operator parallelism to be roughly doubled, which is not 
desired for elasticity. Setting an max parallelism manually may not be desired 
as well: users may not have the sufficient expertise to select a good 
max-parallelism value.

*Proposal*

Update the auto-derivation rule of max parallelism to derive larger max 
parallelism for better elasticity experience out of the box. A candidate is as 
follows:

`min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
32767)`

Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33891) Remove the obsolete SingleJobGraphStore

2023-12-19 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33891:
-

 Summary: Remove the obsolete SingleJobGraphStore
 Key: FLINK-33891
 URL: https://issues.apache.org/jira/browse/FLINK-33891
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Reporter: Zhanghao Chen


SingleJobGraphStore was introduced a long time ago in FLIP-6. It is only used 
in a test case in DefaultDispatcherRunnerITCase#
leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader. We can 
replace it with TestingJobGraphStore there and then safely remove the class. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33682) Reuse source operator input records/bytes metrics for SourceOperatorStreamTask

2023-11-28 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33682:
-

 Summary: Reuse source operator input records/bytes metrics for 
SourceOperatorStreamTask
 Key: FLINK-33682
 URL: https://issues.apache.org/jira/browse/FLINK-33682
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Metrics
Reporter: Zhanghao Chen


For SourceOperatorStreamTask, source opeartor is the head operator that takes 
input. We can directly reuse source operator input records/bytes metrics for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33681) Display source/sink numRecordsIn/Out & numBytesIn/Out on UI

2023-11-28 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33681:
-

 Summary: Display source/sink numRecordsIn/Out & numBytesIn/Out on 
UI
 Key: FLINK-33681
 URL: https://issues.apache.org/jira/browse/FLINK-33681
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Zhanghao Chen
 Attachments: image-2023-11-29-13-26-15-176.png

Currently, the numRecordsIn & numBytesIn metrics for sources and the 
numRecordsOut & numBytesOut metrics for sinks are always 0 on the Flink web 
dashboard.

[FLINK-11576|https://issues.apache.org/jira/browse/FLINK-11576] brings us these 
metrics on the opeartor level, but it does not integrate them on the task 
level. On the other hand, the summay metrics on the job overview page is based 
on the task level I/O metrics. As a result, even though new connectors 
supporting FLIP-33 metrics will report operator-level I/O metrics, we still 
cannot see the metrics on dashboard.

This ticket serves as an umbrella issue to integrate standard source/sink I/O 
metrics with the corresponding task I/O metrics. 

!image-2023-11-29-13-26-15-176.png|width=590,height=252!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33265) Support source parallelism setting for Kafka connector

2023-10-12 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33265:
-

 Summary: Support source parallelism setting for Kafka connector
 Key: FLINK-33265
 URL: https://issues.apache.org/jira/browse/FLINK-33265
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33264) Support source parallelism setting for DataGen connector

2023-10-12 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33264:
-

 Summary: Support source parallelism setting for DataGen connector
 Key: FLINK-33264
 URL: https://issues.apache.org/jira/browse/FLINK-33264
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33263) Implement ParallelismProvider for sources in Blink planner

2023-10-12 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33263:
-

 Summary: Implement ParallelismProvider for sources in Blink planner
 Key: FLINK-33263
 URL: https://issues.apache.org/jira/browse/FLINK-33263
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33262) Extend source provider interfaces with the new parallelism provider interface

2023-10-12 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33262:
-

 Summary: Extend source provider interfaces with the new 
parallelism provider interface
 Key: FLINK-33262
 URL: https://issues.apache.org/jira/browse/FLINK-33262
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33261) FLIP-367: Support Setting Parallelism for Table/SQL Sources

2023-10-12 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33261:
-

 Summary: FLIP-367: Support Setting Parallelism for Table/SQL 
Sources
 Key: FLINK-33261
 URL: https://issues.apache.org/jira/browse/FLINK-33261
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Zhanghao Chen


Umbrella issue for [FLIP-367: Support Setting Parallelism for Table/SQL Sources 
- Apache Flink - Apache Software 
Foundation|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33240) Generate docs for deprecated options as well

2023-10-11 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33240:
-

 Summary: Generate docs for deprecated options as well
 Key: FLINK-33240
 URL: https://issues.apache.org/jira/browse/FLINK-33240
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Zhanghao Chen
 Fix For: 1.19.0


Currently, Flink will skip doc generation for deprecated options (See 
{{{}ConfigOptionsDocGenerator#{}}}{{{}shouldBeDocumented{}}}). As a result, the 
deprecated options can no longer be found in the new version of Flink document. 
This might confuse users upgrading from an older version of Flink and they have 
to either carefully read the release notes or check the source code for 
upgrading guidance on deprecated options. I suggest generating doc for 
deprecated options as well, and we should scan through the code to make sure 
that proper upgrading guidance is provided for the deprecated options.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33236) Remove the unused high-availability.zookeeper.path.running-registry option

2023-10-10 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33236:
-

 Summary: Remove the unused 
high-availability.zookeeper.path.running-registry option
 Key: FLINK-33236
 URL: https://issues.apache.org/jira/browse/FLINK-33236
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0


The running registry subcomponent of Flink HA has been removed in 
[FLINK-25430|https://issues.apache.org/jira/browse/FLINK-25430] and the 
"high-availability.zookeeper.path.running-registry" option is of no use after 
that. We should remove the option and regenerate the config doc to remove the 
relevant descriptions to avoid user's confusion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33221) Add config options for administrator JVM options

2023-10-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33221:
-

 Summary: Add config options for administrator JVM options
 Key: FLINK-33221
 URL: https://issues.apache.org/jira/browse/FLINK-33221
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Zhanghao Chen


We encounter similar issues described in SPARK-23472. Users may need to add JVM 
options to their Flink applications (e.g. to tune GC options). They typically 
use {{env.java.opts.x}} series of options to do so. We also have a set of 
administrator JVM options to apply by default, e.g. to enable GC log, tune GC 
options, etc. Both use cases will need to set the same series of options and 
will clobber one another.

In the past, we generated and pretended to the administrator JVM options in the 
Java code for generating the starting command for JM/TM. However, this has been 
proven to be difficult to maintain.

Therefore, I propose to also add a set of default JVM options for administrator 
use that prepends the user-set extra JVM options. We can mark the existing 
{{env.java.opts.x}} series as user-set extra JVM options and add a set of new 
{{env.java.opts.x.default}} options for administrator use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33205) Replace Akka with Pekko in the description of "pekko.ssl.enabled"

2023-10-07 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33205:
-

 Summary: Replace Akka with Pekko in the description of 
"pekko.ssl.enabled"
 Key: FLINK-33205
 URL: https://issues.apache.org/jira/browse/FLINK-33205
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33204) Add description for missing options in the all jobmanager/taskmanager options section in document

2023-10-07 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33204:
-

 Summary: Add description for missing options in the all 
jobmanager/taskmanager options section in document
 Key: FLINK-33204
 URL: https://issues.apache.org/jira/browse/FLINK-33204
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Configuration
Affects Versions: 1.17.0, 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0


There are 4 options which are excluded from the all jobmanager/taskmanager 
options section in the configuration document:
 # taskmanager.bind-host
 # taskmanager.rpc.bind-port
 # jobmanager.bind-host
 # jobmanager.rpc.bind-port

We should add them to the document under the all  jobmanager/taskmanager 
options section for completeness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33166) Support setting root logger level by config

2023-09-28 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33166:
-

 Summary: Support setting root logger level by config
 Key: FLINK-33166
 URL: https://issues.apache.org/jira/browse/FLINK-33166
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Zhanghao Chen


Users currently cannot change logging level by config and have to modify the 
cumbersome logger configuration file manually. We'd better provide a shortcut 
and support setting root logger level by config.

There're a number configs already to set logger configurations, like 
{{env.log.dir}} for logging dir, {{env.log.max}} for max number of old logging 
file to save. We can name the new config {{{}env.log.level{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33148) Update Web UI to adopt the new "endpoint" field in REST API

2023-09-24 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33148:
-

 Summary: Update Web UI to adopt the new "endpoint" field in REST 
API
 Key: FLINK-33148
 URL: https://issues.apache.org/jira/browse/FLINK-33148
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33147) Introduce a new "endpoint" field in REST API to represent TaskManager endpoint (host + port) and deprecate the "host" field

2023-09-24 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33147:
-

 Summary: Introduce a new "endpoint" field in REST API to represent 
TaskManager endpoint (host + port) and deprecate the "host" field
 Key: FLINK-33147
 URL: https://issues.apache.org/jira/browse/FLINK-33147
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / REST
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33146) FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-24 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33146:
-

 Summary: FLIP-363: Unify the Representation of TaskManager 
Location in REST API and Web UI
 Key: FLINK-33146
 URL: https://issues.apache.org/jira/browse/FLINK-33146
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST, Runtime / Web Frontend
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0


Umbrella ticket for [FLIP-363: Unify the Representation of TaskManager Location 
in REST API and Web 
UI|https://cwiki.apache.org/confluence/display/FLINK/FLIP-363%3A+Unify+the+Representation+of+TaskManager+Location+in+REST+API+and+Web+UI].
 This is a continuation of [FLINK-25371] Include data port as part of the host 
info for subtask detail panel on Web UI - ASF JIRA (apache.org).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33123) Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler and

2023-09-20 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33123:
-

 Summary: Wrong dynamic replacement of partitioner from FORWARD to 
REBLANCE for autoscaler and adaptive scheduler  and 
 Key: FLINK-33123
 URL: https://issues.apache.org/jira/browse/FLINK-33123
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Runtime / Coordination
Affects Versions: 1.17.0, 1.18.0
Reporter: Zhanghao Chen


*Background*

https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is 
wrong when the parallelism is changed for a vertex with a FORWARD edge, which 
is used by both the autoscaler and adaptive scheduler where one can change the 
vertex parallelism dynamically. Fix is applied to dynamically replace 
partitioner from FORWARD to REBLANCE on task deployment in {{{}StreamTask{}}}: 

    {{private static void 
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(}}
{{            Environment environment, NonChainedOutput streamOutput) {}}
{{            Environment environment, NonChainedOutput streamOutput, int 
outputIndex) {}}
{{        if (streamOutput.getPartitioner() instanceof ForwardPartitioner}}
{{                && streamOutput.getConsumerParallelism()}}
{{                && 
environment.getWriter(outputIndex).getNumberOfSubpartitions()}}
{{                        != 
environment.getTaskInfo().getNumberOfParallelSubtasks()) {}}
{{            LOG.debug(}}
{{                    "Replacing forward partitioner with rebalance for {}",}}
{{                    environment.getTaskInfo().getTaskNameWithSubtasks());}}
{{            streamOutput.setPartitioner(new RebalancePartitioner<>());}}
{{        }}}
{{    }}}

*Problem*

Unfortunately, the fix is still buggy in two aspects:
 # The connections between upstream and downstream tasks are determined by the 
distribution type of the partitioner when generating execution graph on the JM 
side. When the edge is FORWARD, the distribution type is POINTWISE, and Flink 
will try to evenly distribute subpartitions to all downstream tasks. If one 
want to change it to REBALANCE, the distribution type has to be changed to 
ALL_TO_ALL to make all-to-all connections between upstream and downstream 
tasks. However, the fix did not change the distribution type which makes the 
network connections be set up in a wrong way.
 # The FOWARD partitioner will be replaced if 
environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the 
task parallelism. However, the number of subpartitions here equals to the 
number of downstream tasks of this particular task, which is also determined by 
the distribution type of the partitioner when generating execution graph on the 
JM side.  When ceil(downstream task parallelism / upstream task parallelism) = 
upstream task parallelism, we will have the number of subpartitions = task 
parallelism. In fact, for a normal job with a FORWARD edge without any 
autoscaling action, you will find that the partitioner is changed to REBALANCE 
internally as the number of subpartitions always equals to 1 in this case.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32983) Support setting env.java.opts.all & env.java.opts.cli configs via dynamic properties on the CLI side

2023-08-28 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32983:
-

 Summary: Support setting env.java.opts.all & env.java.opts.cli 
configs via dynamic properties on the CLI side
 Key: FLINK-32983
 URL: https://issues.apache.org/jira/browse/FLINK-32983
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client, Deployment / Scripts
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0


*Problem*

The following configs are supposed to be supported:
|h5. env.java.opts.all|(none)|String|Java options to start the JVM of all Flink 
processes with.|
|h5. env.java.opts.client|(none)|String|Java options to start the JVM of the 
Flink Client with.|

However, the two configs only takes effect on the Client side when they are set 
in the flink-conf files. In other words, configs set via -D or-yD on the CLI 
will not take effect, which is counter-intuitive and makes configuration less 
flexible.

 

*Proposal*

Add logic to parse configs set via -D or-yD in config.sh and make them has a 
higher precedence over configs set in flink-conf.yaml for env.java.opts.all & 
env.java.opts.client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32980) Support env.java.opts.all & env.java.opts.cli config for starting Session clusters

2023-08-28 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32980:
-

 Summary: Support env.java.opts.all & env.java.opts.cli config for 
starting Session clusters
 Key: FLINK-32980
 URL: https://issues.apache.org/jira/browse/FLINK-32980
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client, Deployment / Scripts
Affects Versions: 1.18.0
Reporter: Zhanghao Chen
 Fix For: 1.19.0


*Problem*

The following configs are supposed to be supported:
|h5. env.java.opts.all|(none)|String|Java options to start the JVM of all Flink 
processes with.|
|h5. env.java.opts.client|(none)|String|Java options to start the JVM of the 
Flink Client with.|

However, the two configs do not take effect for starting Flink session clusters 
using kubernetes-session.sh and yarn-session.sh. This can lead to problems in 
complex production envs. For example, in my company, some nodes are IPv6-only, 
and the connection between Flink client and K8s/YARN control plane is via a 
domain name whose backend is on IPv4/v6 dual stack, and the JVM arg 
-Djava.net.preferIPv6Addresses=true needs to be set to make Java connect to 
IPv6 addresses in favor of IPv4 ones otherwise the K8s/YARN control plane is 
inaccessible.

 

*Proposal*

The fix is straight-forward, simply apply the following changes to the session 
scripts:

`
# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"
 
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS xxx
`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32872) Add option to control the default partitioner when the parallelism of upstream and downstream operator does not match

2023-08-15 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32872:
-

 Summary: Add option to control the default partitioner when the 
parallelism of upstream and downstream operator does not match
 Key: FLINK-32872
 URL: https://issues.apache.org/jira/browse/FLINK-32872
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Configuration
Affects Versions: 1.17.0
Reporter: Zhanghao Chen


*Problem*

Currently, when the no partitioner is specified, FORWARD partitioner is used if 
the parallelism of upstream and downstream operator matches, REBALANCE 
partitioner used otherwise. However, this behavior is not configurable and can 
be undesirable in certain cases:
 #  REBALANCE partitioner will create an all-to-all connection between upstream 
and downstream operators and consume a lot of extra CPU and memory resources 
when the parallelism is high in pipelining mode and RESCALE partitioner is 
desirable in this case.
 # For Flink SQL jobs, users cannot specify the partitioner directly so far. 
And for DataStream jobs, users may not want to explicitly set the partitioner 
everywhere.

*Proposal*

Add an option to control the default partitioner when the parallelism of 
upstream and downstream operator does not match. The option can have the name 
"pipeline.default-partioner-with-unmatched-parallelism" with REBALANCE as the 
default value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32868) Document the need to backport FLINK-30213 for using autoscaler with older version Flinks

2023-08-14 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32868:
-

 Summary: Document the need to backport FLINK-30213 for using 
autoscaler with older version Flinks
 Key: FLINK-32868
 URL: https://issues.apache.org/jira/browse/FLINK-32868
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Zhanghao Chen


The current Autoscaler doc states on job requirements as the following:

Job requirements:
 * The autoscaler currently only works with the latest [Flink 
1.17|https://hub.docker.com/_/flink] or after backporting the following fixes 
to your 1.15/1.16 Flink image
 ** [Job vertex parallelism 
overrides|https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9]
 (must have)
 ** [Support timespan for busyTime 
metrics|https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35]
 (good to have)

However, https://issues.apache.org/jira/browse/FLINK-30213 is also crucial and 
need to be backported to 1.15/1.16 to enable autoscaling. We should add it to 
the doc as well, and marked as must have.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32822) Add connector option to control whether to enable auto-commit of offsets when checkpoints is enabled

2023-08-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32822:
-

 Summary: Add connector option to control whether to enable 
auto-commit of offsets when checkpoints is enabled
 Key: FLINK-32822
 URL: https://issues.apache.org/jira/browse/FLINK-32822
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Zhanghao Chen


When checkpointing is enabled, Flink Kafka connector commits the current 
consuming offset when checkpoints are *completed* although ** Kafka source does 
*NOT* rely on committed offsets for fault tolerance. When the checkpoint 
interval is long, the lag curve will behave in a zig-zag way: the lag will keep 
increasing, and suddenly drops on a complete checkpoint. It have led to some 
confusion for users as in 
[https://stackoverflow.com/questions/76419633/flink-kafka-source-commit-offset-to-error-offset-suddenly-increase-or-decrease]
 and may also affect external monitoring for setting up alarms (you'll have to 
set up with a high threshold due to the non-realtime commit of offsets) and 
autoscaling (the algorithm would need to pay extra effort to distinguish 
whether the backlog is actually growing or just because the checkpoint is not 
completed yet).

Therefore, I think it is worthwhile to add an option to enable auto-commit of 
offsets when checkpoints is enabled. For DataStream API, it will be adding a 
configuration method. For Table API, it will be adding a new connector option 
which wires to the DataStream API configuration underneath.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32821) Streaming examples failed to execute due to error in packaging

2023-08-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32821:
-

 Summary: Streaming examples failed to execute due to error in 
packaging
 Key: FLINK-32821
 URL: https://issues.apache.org/jira/browse/FLINK-32821
 Project: Flink
  Issue Type: Improvement
  Components: Examples
Affects Versions: 1.18.0
Reporter: Zhanghao Chen


5 out of the 7 streaming examples failed to run:
 * Iteration & SessionWindowing & SocketWindowWordCount & WindowJoin failed to 
run due to java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/examples/utils/ParameterTool
 * TopSpeedWindowing failed to run due to: Caused by: 
java.lang.ClassNotFoundException: 
org.apache.flink.connector.datagen.source.GeneratorFunction

The NoClassDefFoundError with ParameterTool is introduced by [FLINK-32558] 
Properly deprecate DataSet API - ASF JIRA (apache.org), and we'd better resolve 
[FLINK-32820] ParameterTool is mistakenly marked as deprecated - ASF JIRA 
(apache.org) first before we come to a fix for this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32820) ParameterTool is mistakenly marked as deprecated

2023-08-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32820:
-

 Summary: ParameterTool is mistakenly marked as deprecated
 Key: FLINK-32820
 URL: https://issues.apache.org/jira/browse/FLINK-32820
 Project: Flink
  Issue Type: Improvement
  Components: API / DataSet, API / DataStream
Affects Versions: 1.18.0
Reporter: Zhanghao Chen


ParameterTool and AbstractParameterTool in pacakge flink-java is mistakenly 
marked as deprecated in [FLINK-32558] Properly deprecate DataSet API - ASF JIRA 
(apache.org). They are widely used for handling application parameters and is 
also listed in the Flink user doc: 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/.|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/]
 Also, they are not directly related to Dataset API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32127) Source busy time is inaccurate in many cases

2023-05-18 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32127:
-

 Summary: Source busy time is inaccurate in many cases
 Key: FLINK-32127
 URL: https://issues.apache.org/jira/browse/FLINK-32127
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Zhanghao Chen


We found that source busy time is inaccurate in many cases. The reason is that 
sources are usu. multi-threaded (Kafka and RocketMq for example), there is a 
fetcher thread fetching data from data source, and a consumer thread 
deserializes data with an blocking queue in between. A source is considered 
 # *idle* if the consumer is blocked by fetching data from the queue
 # *backpressured* if the consumer is blocked by writing data to downstream 
operators
 # *busy* otherwise

However, this means that if the bottleneck is on the fetcher side, the consumer 
will be often blocked by fetching data from the queue, the source idle time 
would be high, but in fact it is busy and consumes a lot of CPU. In some of our 
jobs, the source max busy time is only ~600 ms while it is actually reaching 
the limit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32124) Add option to enable partition alignment for sources

2023-05-18 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32124:
-

 Summary: Add option to enable partition alignment for sources
 Key: FLINK-32124
 URL: https://issues.apache.org/jira/browse/FLINK-32124
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Zhanghao Chen


Currently, autoscaler did not consider balancing partitions among source tasks. 
In our production env, partition skew has proven to be a severe problem for 
many jobs. Especially in a job topology with all forward or rescale shuffles,  
partition skew on the source side can further lead to data imbalance in later 
operators.

We should add an option to enable partition alignment for sources for that, but 
making it disabled by default as this has a side effect in that partition usu. 
has limited factors and enabling alignment will greatly limit our scaling 
choices. Also, if data among partitions are imbalanced in the first place, 
partition alignment won't help as well (this is not a common case inside our 
company though).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32037) The edge on Web UI is wrong after parallelism changes via parallelism overrides or AdaptiveScheduler

2023-05-08 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-32037:
-

 Summary: The edge on Web UI is wrong after parallelism changes via 
parallelism overrides or AdaptiveScheduler 
 Key: FLINK-32037
 URL: https://issues.apache.org/jira/browse/FLINK-32037
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / REST
Affects Versions: 1.17.0
Reporter: Zhanghao Chen


*Background*

After FLINK-30213, in case of parallelism changes to the JobGraph, as done via 
the AdaptiveScheduler or through providing JobVertexId overrides in 
PipelineOptions#PARALLELISM_OVERRIDES, when the consumer parallelism doesn't 
match the local parallelism, the original ForwardPartitioner will be replaced 
with the RebalancePartitioner.

*Problem*

Although the actual partitioner changes underneath, the ship strategy seen on 
the Web UI is still FORWARD.

This is because the fix patch applies when we init StreamTask, and the job 
graph is not touched. Web UI uses the JSON plan generated from the job graph 
for display, and the ship strategy is get by JobEdge#getShipStrategyName.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31991) Update Autoscaler doc to reflect the changes brought by the new source scaling logic

2023-05-03 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31991:
-

 Summary: Update Autoscaler doc to reflect the changes brought by 
the new source scaling logic
 Key: FLINK-31991
 URL: https://issues.apache.org/jira/browse/FLINK-31991
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Zhanghao Chen
 Attachments: image-2023-05-03-20-16-33-704.png

The current statements on job requirements are outdated:

??- All sources must use the new Source API (most common connectors already 
do)??
??- Source scaling requires sources to expose the standardized connector 
metrics for accessing backlog information (source scaling can be disabled)??

The Autoscaler doc needs to be updated to reflect the changes brought by the 
new source scaling logic ([FLINK-31326|[FLINK-31326] Disabled source scaling 
breaks downstream scaling if source busyTimeMsPerSecond is 0 - ASF JIRA 
(apache.org)]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31936) Support setting scale up max factor

2023-04-25 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31936:
-

 Summary: Support setting scale up max factor
 Key: FLINK-31936
 URL: https://issues.apache.org/jira/browse/FLINK-31936
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Zhanghao Chen


Currently, only scale down max factor is supported to be configured. We should 
also add a config for scale up max factor as well. In many cases, a job's 
performance won't improve after scaling up due to external bottlenecks. 
Although we can detect ineffective scaling up would block further scaling, but 
it already hurts if we scale too much in a single step which may even burn out 
external services.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31827) Incorrect estimation of the target data rate of a vertex when only a subset of its upstream vertex's output is consumed

2023-04-17 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31827:
-

 Summary: Incorrect estimation of the target data rate of a vertex 
when only a subset of its upstream vertex's output is consumed
 Key: FLINK-31827
 URL: https://issues.apache.org/jira/browse/FLINK-31827
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler
Reporter: Zhanghao Chen
 Attachments: image-2023-04-17-23-37-35-280.png

Currently, the target data rate of a vertex = SUM(target data rate * 
input/output ratio) for all of its upstream vertices. This assumes that all 
output records of an upstream vertex is consumed by the downstream vertex. 
However, it does not always hold. Consider the following job plan generated by 
a Flink SQL job. The middle vertex contains multiple chained Calc(select xx) 
operators, each connecting to a separate downstream sink tasks. As a result, 
each sink task only consumes a sub-portion of the middle vertex's output.

To fix it, we need operator level edge info to infer the upstream-downstream 
relationship as well as operator level output metrics. The metrics part is easy 
but AFAIK, there's no way to get the operator level edge info from the Flink 
REST API yet.

!image-2023-04-17-23-37-35-280.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31826) Incorrect estimation of the target data rate of a vertex when only a subset of its upstream vertex's output is consumed

2023-04-17 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31826:
-

 Summary: Incorrect estimation of the target data rate of a vertex 
when only a subset of its upstream vertex's output is consumed
 Key: FLINK-31826
 URL: https://issues.apache.org/jira/browse/FLINK-31826
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: Zhanghao Chen
 Attachments: LHL7VKOG4B.jpg

Currently, a vertex's target data rate = the sum of its upstream vertex's 
target data rate * input/output ratio. This assumes that all of the upstream 
vertex output goes into the current vertex. However, it does not always hold. 
Consider the following job plan generated by a Flink SQL job. The vertex in the 
middle has multiple Calc(select xx) operators chained, each connects to a 
separate downstream tasks. The total num_rec_out_rate of the middle vertex = 
SUM num_rec_in_rate of its downstream tasks.

To fix this problem, we need operator level output metrics and edge info. The 
operator level metrics part is easy, but AFAIK, there's no way to get the 
operator level edge info from the current Flink REST APIs.

!LHL7VKOG4B.jpg!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31769) Add percentiles to aggregated metrics

2023-04-11 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31769:
-

 Summary: Add percentiles to aggregated metrics
 Key: FLINK-31769
 URL: https://issues.apache.org/jira/browse/FLINK-31769
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Runtime / Metrics
Reporter: Zhanghao Chen
 Attachments: image-2023-04-11-15-11-51-471.png

*Background*

Currently only min/avg/max of metrics are exposed via REST API. Flink 
Autoscaler relies on these aggregated metrics to make predictions, and the type 
of aggregation plays an import role. [FLINK-30652] Use max busytime instead of 
average to compute true processing rate - ASF JIRA (apache.org) suggests that 
using max aggregator instead of avg of busy time can handle data skew more 
robustly. However, we found that for large-scale jobs, using max aggregation 
may be too sensitive. As a result, the true processing rate is underestimated 
with severe turbulence.

The graph below is the true processing rate estimated with different 
aggregators of a real production data transmission job with a parallelism of 
750.

!image-2023-04-11-15-11-51-471.png!

*Proposal*

Add percentiles (p50, p90, p99) to aggregated metrics. Apache common maths can 
be used for computing that.

A follow up would be making Flink autoscaler make use of the new aggregators.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31358) Display consumed split/partition/queue info on Web UI

2023-03-07 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31358:
-

 Summary: Display consumed split/partition/queue info on Web UI
 Key: FLINK-31358
 URL: https://issues.apache.org/jira/browse/FLINK-31358
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Zhanghao Chen


Many data sources have the concept of "split", which is a partition of the 
whole data (e.g. partition in Kafka, queue in RocketMQ), and each Flink source 
task is allocated with a subset of splits to consume. When a job is lagging on 
only a few splits, it would be useful for determining whether it is a data 
source issue or a Flink issue if users can view which source task consumes 
which split on Web UI.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31245) Adaptive scheduler does not reset the state of GlobalAggregateManager when rescaling

2023-02-27 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-31245:
-

 Summary: Adaptive scheduler does not reset the state of 
GlobalAggregateManager when rescaling
 Key: FLINK-31245
 URL: https://issues.apache.org/jira/browse/FLINK-31245
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.1
Reporter: Zhanghao Chen
 Fix For: 1.18.0


*Problem*

GlobalAggregateManager is used to share state amongst parallel tasks in a job 
and thus coordinate their execution. It maintains a state (the _accumulators_ 
field in JobMaster) in JM memory. The accumulator state content is defined in 
user code, in my company, a user stores task parallelism in the accumulator, 
assuming task parallelism never changes. However, this assumption is broken 
when using adaptive scheduler.

*Possible Solutions*
 # Mark GlobalAggregateManager as deprecated. It seems that operator 
coordinator can completely replace GlobalAggregateManager and is a more elegent 
solution. Therefore, it is fine to deprecate GlobalAggregateManager and leave 
this issue there. It that's the case, we can open another ticket for doing that.
 # If we decide to continue supporting GlobalAggregateManager, then we need to 
reset the state when rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30829) Make the backpressure tab could be sort by the backpressure level

2023-01-30 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-30829:
-

 Summary: Make the backpressure tab could be sort by the 
backpressure level
 Key: FLINK-30829
 URL: https://issues.apache.org/jira/browse/FLINK-30829
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.17
Reporter: Zhanghao Chen


[FLINK-29998|https://issues.apache.org/jira/browse/FLINK-29998] enables user to 
sort the backpressure tab to see which task is busiest. Another common scenario 
for backpressure analysis is to find which tasks are backpressured. We should 
add support to sort the backpressure tab by backpressure level as well.

 
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30513) HA storage dir leaks on cluster termination

2022-12-27 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-30513:
-

 Summary: HA storage dir leaks on cluster termination 
 Key: FLINK-30513
 URL: https://issues.apache.org/jira/browse/FLINK-30513
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.16.0, 1.15.0
Reporter: Zhanghao Chen
 Attachments: image-2022-12-27-21-32-17-510.png

*Problem*

We found that HA storage dir leaks on cluster termination for a Flink job with 
HA enabled. The following picture shows the HA storage dir (here on HDFS) of 
the cluster czh-flink-test-offline (of application mode) after canelling the 
job with flink-cancel. We are left with an empty dir, and too many empty dirs 
will greatly hurt the stability of HDFS NameNode!  
!image-2022-12-27-21-32-17-510.png|width=582,height=158!

Furthermore, in case the user choose to retain the checkpoints on job 
termination, we will have the completedCheckpoints leaked as well. Note that we 
no longer need the completedCheckpoints files as we'll directly recover 
retained CPs from the CP data dir.

*Root Cause*

When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
store, but didn't clean the HA storage dir.

*Proposal*

Clean up the HA storage dir after cleaning up blob store in 
AbstractHaServices#closeAndCleanupAllData().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30101) YARN client should

2022-11-20 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-30101:
-

 Summary: YARN client should 
 Key: FLINK-30101
 URL: https://issues.apache.org/jira/browse/FLINK-30101
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Affects Versions: 1.16.0
Reporter: Zhanghao Chen
 Fix For: 1.17.0


*Problem*

Currently, the procedure of retrieving a Flink on YARN cluster client is as 
follows (in YarnClusterDescriptor#retrieve method):
 # Get application report from YARN
 # Set rest.address & rest.port using the info from application report
 # Create a new RestClusterClient using the updated configuration, will use 
client HA serivce to fetch the rest.address & rest.port if HA is enabled

Here, we can see that the usage of client HA in step 3 is redundant, as we've 
already got the rest.address & rest.port from YARN application report. When ZK 
HA is enabled, this would take ~1.5 s to initialize client HA services and 
fetch the rest IP & port. 

1.5 s can mean a lot for latency-sensitive client operations.  In my company, 
we use Flink client to submit short-running session jobs and e2e latency is 
critical. The job submission time is around 10 s on average, and 1.5s would 
mean 15% of time saving. 

*Proposal*

When retrieving a Flink on YARN cluster client, use StandaloneClientHAServices 
to
create RestClusterClient instead as we have pre-fetched rest.address & 
rest.port from YARN application report. This is also what we did in 
KubernetesClusterDescriptor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29615) MetricStore does not remove metrics of nonexistent subtasks when adaptive scheduler lowers job parallelism

2022-10-13 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-29615:
-

 Summary: MetricStore does not remove metrics of nonexistent 
subtasks when adaptive scheduler lowers job parallelism
 Key: FLINK-29615
 URL: https://issues.apache.org/jira/browse/FLINK-29615
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics, Runtime / REST
Affects Versions: 1.15.0, 1.16.0
Reporter: Zhanghao Chen
 Fix For: 1.17.0


*Problem*

MetricStore does not remove metrics of nonexistent subtasks when adaptive 
scheduler lowers job parallelism and users will see metrics of nonexistent 
subtasks on Web UI (e.g. the task backpressure page) or REST API response.

 

*Proposed Solution*

Thanks to [FLINK-29132] SubtaskMetricStore causes memory leak. - ASF JIRA 
(apache.org) & [FLINK-28588] Enhance REST API for Speculative Execution - ASF 
JIRA (apache.org),  Flink will now update current execution attempts when 
updating metrics. Since the active subtask info is included in the current 
execution attempt info, we are able to retain active subtasks using the current 
execution attempt info.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28973) Extending /jars/:jarid/plan API to support setting Flink configs

2022-08-15 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-28973:
-

 Summary: Extending /jars/:jarid/plan API to support setting Flink 
configs
 Key: FLINK-28973
 URL: https://issues.apache.org/jira/browse/FLINK-28973
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27060) Extending /jars/:jarid/run API to

2022-04-05 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-27060:
-

 Summary: Extending /jars/:jarid/run API to 
 Key: FLINK-27060
 URL: https://issues.apache.org/jira/browse/FLINK-27060
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Zhanghao Chen


*Background*

Users want to submit job via Flink REST API instead of Flink CLI which is more 
heavy-weight in certain scenarios, for example, a lightweight data processing 
workflow system that has Flink related systems.

Currently, the /jars/:jarid/run API 
([https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run)]
 only supports a few selected Flink config options listed in the doc 
(parallelism、savepoint path and allow non-restored state), which is 
insufficient for practical use.

*Proposed Changes*

Extending the /jars/:jarid/run API with an additional request body parameter 
"configs", which is a map of flink configuration option-value pairs set by 
users.

For backward compatibility, we can retain the existing body parameters like 
"allowNonRestoredState", and when there's conflicting configurations, let the 
values set explicitly with existing body parameters take higher precedence over 
the values set by configs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27056) "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value

2022-04-04 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-27056:
-

 Summary: "pipeline.time-characteristic" should be deprecated and 
have EVENT_TIME as default value
 Key: FLINK-27056
 URL: https://issues.apache.org/jira/browse/FLINK-27056
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.14.4, 1.13.6, 1.12.7, 1.15.0
Reporter: Zhanghao Chen
 Fix For: 1.16.0


*Background*
 # pipeline.time-characteristic is the configuration option used to control the 
time characteristic for all created streams, and has the default value 
_PROCESSING_TIME_ at the point of writing. However, the configuration option 
won't take effect unless it is explicitly set by user as we read it into the 
code by configurtion.getOptional(xx).ifPresent(xx).
 # The default value of _TIME_CHARACTERISTIC_ has been changed from 
_PROCESSING_TIME_ to _EVENT_TIME_ in [FLINK-19317] Make EventTime the default 
StreamTimeCharacteristic - ASF JIRA (apache.org)
 # _TIME_CHARACTERISTIC_ and the relevant operations that set or uses it have 
been deprecated in [FLINK-19318] Deprecate timeWindow() operations in 
DataStream API - ASF JIRA (apache.org) and [FLINK-19319] Deprecate 
StreamExecutionEnvironment.setStreamTimeCharacteristic() and TimeCharacteristic 
- ASF JIRA (apache.org)

*Proposed Change*
 # {*}{*}pipeline.time-characteristic should be deprecated, just like other 
_TIME_CHARACTERISTIC_ related operations as we no longer want user to set this.
 # pipeline.time-characteristic should have the default value of 
{_}EVENT_TIME{_}, to reflect the actual default value in system, and avoid 
misleading users.

Additionally, I think all configuration options which only take effect when it 
is explicitly set by user (aka those read into the system by 
configurtion.getOptional(xx).ifPresent(xx)), should have no default values.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25372) Add thread dump feature for jobmanager

2021-12-18 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-25372:
-

 Summary: Add thread dump feature for jobmanager
 Key: FLINK-25372
 URL: https://issues.apache.org/jira/browse/FLINK-25372
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / REST, Runtime / Web Frontend
Reporter: Zhanghao Chen


Add thread dump feature for jobmanager in addition to the previous work on TM 
side: [link FLINK-14816|[FLINK-14816] Add thread dump feature for taskmanager - 
ASF JIRA (apache.org)]. In OLAP scenario, it will be helpful to get JM thread 
dump info for debugging.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25371) Include port as part of the host info for subtask detail panel on Web UI

2021-12-18 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-25371:
-

 Summary: Include port as part of the host info for subtask detail 
panel on Web UI
 Key: FLINK-25371
 URL: https://issues.apache.org/jira/browse/FLINK-25371
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST, Runtime / Web Frontend
Reporter: Zhanghao Chen
 Attachments: image-2021-12-18-17-57-13-674.png, 
image-2021-12-18-18-11-59-143.png

*Problem*

Currently, the subtask detail panel on Web UI only displays the hostname of the 
TaskManager on which the subtask is running. However, the data port of the TM 
is not displayed, whose missing introduces inconvenience for identifying 
whether two subtasks are running on the same TM.

!image-2021-12-18-17-57-13-674.png!

*Proposal*

Include port as part of the host info for subtask detail panel on Web UI.

!image-2021-12-18-18-11-59-143.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24223) Client should throw exception to warn users when the configurations set by program options conflict with those set by -D

2021-09-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-24223:
-

 Summary: Client should throw exception to warn users when the 
configurations set by program options conflict with those set by -D
 Key: FLINK-24223
 URL: https://issues.apache.org/jira/browse/FLINK-24223
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.13.0, 1.12.0, 1.11.0
Reporter: Zhanghao Chen


h2. Problem

Currently, program options (e.g. -d, -p) has high precedence over configuration 
options set by -D or -yD at client side. This may cause confusion for users, 
especially for those program options without args. For example, if a user sets 
-Dexecution.attached=false without setting -d (they may not be aware of the 
existence of this option), they will find that the configuration value does not 
take effect.
h2. Proposal

Client should throw exception to warn users when the configurations set by 
program options conflict with those set by -D.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)