Hi devs,
I’d like to join this discussion. CC:Qingsheng
As discussed above, new partitions after the first discovery should be
consumed from EARLIEST offset.

However, when KafkaSourceEnumerator restarts after a job failure, it cannot
distinguish between unassigned partitions as first-discovered or new,
because the snapshot state currently only contains assignedPartitions
collection (the assigned partitions). We can solve this by adding a
unAssignedInitialPartitons collection to snapshot state, which represents
the collection of first discovered partitions that have not yet been
assigned. Also, we can combine this two collections into a single
collection if we add status to each item.

Besides , there is also a problem which often occurs in pattern mode to
distinguish between the following two case:

   1. Case1:  The first partition discovery is too slow, before which the
   checkpoint is finished and then job is restarted .At this time, the
   restored unAssignedInitialPartitons is an empty collection, which means
   non-discovery. The next discovery will be treated as the first discovery.
   2. Case2:  The first time the partition is obtained is empty, and new
   partitions can only be obtained after multiple partition discoveries. If a
   restart occurs between this period, the restored
   *unAssignedInitialPartitons* is also an empty collection, which means
   empty-discovery. However, the next discovery should be treated as a new
   discovery.

We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
to snapshot state, which represents whether the first-discovery has been
done.

Also two rejected alternatives :

   1. Change the KafkaSourceEnumerator's snapshotState method to a blocking
   one, which resumes only after the first-discovered partition has been
   successfully assigned to KafkaSourceReader. The advantage of this approach
   is no need to change the snapshot state's variable values. However, if
   first-discovered partitions are not assigned before checkpointing, the
   SourceCoordinator's event-loop thread will be blocked, but partition
   assignment also requires the event-loop thread to execute, which will cause
   thread self-locking.
   2. An alternative to the *firstDiscoveryDone* variable. If we change the
   first discovery method to a synchronous method, we can ensure that Case1
   will never happen. Because when the event-loop thread starts, it first adds
   a discovery event to the blocking queue. When it turns to execute the
   checkpoint event, the partition has already been discovered successfully.
   However, If partition discovery is a heavily time-consuming operation, the
   SourceCoordinator cannot process other event operations during the waiting
   period, such as reader registration. It is a waste.

Best regards,
Hongshun

On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> Hi devs,
>
> I’d like to start a discussion about enabling the dynamic partition
> discovery feature by default in Kafka source. Dynamic partition discovery
> [1] is a useful feature in Kafka source especially under the scenario when
> the consuming Kafka topic scales out, or the source subscribes to multiple
> Kafka topics with a pattern. Users don’t have to restart the Flink job to
> consume messages in the new partition with this feature enabled.
Currently,
> dynamic partition discovery is disabled by default and users have to
> explicitly specify the interval of discovery in order to turn it on.
>
> # Breaking changes
>
> For Kafka table source:
>
> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> default.
> - As we need to provide a way for users to disable the feature,
> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
> the discovery. Before this proposal, “0” means to enable partition
> discovery with interval = 0, which is a bit senseless in practice.
> Unfortunately we can't use negative values as the type of this option is
> Duration.
>
> For KafkaSource (DataStream API)
>
> - Dynamic partition discovery in Kafka source will be enabled by default,
> with discovery interval set to 30 seconds.
> - To align with table source, only a positive value for option “
> partition.discovery.interval.ms” could be used to specify the discovery
> interval. Both negative and zero will be interpreted as disabling the
> feature.
>
> # Overhead of partition discovery
>
> Partition discovery is made on KafkaSourceEnumerator, which asynchronously
> fetches topic metadata from Kafka cluster and checks if there’s any new
> topic and partition. This shouldn’t introduce performance issues on the
> Flink side.
>
> On the Kafka broker side, partition discovery makes MetadataRequest to
> Kafka broker for fetching topic infos. Considering Kafka broker has its
> metadata cache and the default request frequency is relatively low (per 30
> seconds), this is not a heavy operation and the performance of the broker
> won’t be affected a lot. It'll also be great to get some inputs from Kafka
> experts.
>
> Looking forward to your feedback!
>
> [1]
>
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/#dynamic-partition-discovery
>
> Best regards,
> Qingsheng
>

Reply via email to