[ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15448:
------------------------------------
    Issue Type: New Feature  (was: Improvement)

> Streams StandbyTaskUpdateListener
> ---------------------------------
>
>                 Key: KAFKA-15448
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15448
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Colt McNealy
>            Assignee: Colt McNealy
>            Priority: Minor
>              Labels: kip
>             Fix For: 3.7.0
>
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to