[jira] [Created] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions
Nicolas Perrin created FLINK-35210: -- Summary: Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions Key: FLINK-35210 URL: https://issues.apache.org/jira/browse/FLINK-35210 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Nicolas Perrin Currently the setting of the `KafkaSource` Flink's operator parallelism needs to be manually chosen which can leads to highly skewed tasks if the developer doesn't do this job. To avoid this issue, I propose to: - retrieve dynamically the number of partitions of the topic using `KafkaConsumer. partitionsFor(topic).size()`, - set the parallelism of the stream built from the source based on this value. This way there won't be any idle tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35209) Add a DeserializationSchema decorator that counts deserialize errors
Nicolas Perrin created FLINK-35209: -- Summary: Add a DeserializationSchema decorator that counts deserialize errors Key: FLINK-35209 URL: https://issues.apache.org/jira/browse/FLINK-35209 Project: Flink Issue Type: New Feature Components: API / Core Reporter: Nicolas Perrin I would like to propose a PR that implements a decorator for `DeserializationSchema`. The decorator decorates an `DeserializationSchema` object. The purpose of this decorator is to catch any deserialization errors that could occur when deserializing messages. The decorator has a flag to decide to fail or not in this case. If it makes the error silent, then it would count them in a `flink.metrics.Counter` so the user can monitor the silent errors. This PR is ready to be created. This decorator could be improved by having a sink that would be used to sink all the messages causing deserialization errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)