[jira] [Created] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Nicolas Perrin (Jira)
Nicolas Perrin created FLINK-35210:
--

 Summary: Give the option to set automatically the parallelism of 
the KafkaSource to the number of kafka partitions
 Key: FLINK-35210
 URL: https://issues.apache.org/jira/browse/FLINK-35210
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Nicolas Perrin


Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
to be manually chosen which can leads to highly skewed tasks if the developer 
doesn't do this job.

To avoid this issue, I propose to:
-  retrieve dynamically the number of partitions of the topic using 
`KafkaConsumer.
partitionsFor(topic).size()`,
- set the parallelism of the stream built from the source based on this value.

 This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35209) Add a DeserializationSchema decorator that counts deserialize errors

2024-04-22 Thread Nicolas Perrin (Jira)
Nicolas Perrin created FLINK-35209:
--

 Summary: Add a DeserializationSchema decorator that counts 
deserialize errors
 Key: FLINK-35209
 URL: https://issues.apache.org/jira/browse/FLINK-35209
 Project: Flink
  Issue Type: New Feature
  Components: API / Core
Reporter: Nicolas Perrin


I would like to propose a PR that implements a decorator for 
`DeserializationSchema`.

The decorator decorates an `DeserializationSchema` object. The purpose of this 
decorator is to catch any deserialization errors that could occur when 
deserializing messages. The decorator has a flag to decide to fail or not in 
this case. If it makes the error silent, then it would count them in a 
`flink.metrics.Counter` so the user can monitor the silent errors. This PR is 
ready to be created.

This decorator could be improved by having a sink that would be used to sink 
all the messages causing deserialization errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)