[ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15448:
------------------------------------
    Description: 
KIP-869: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]

In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason

{ MIGRATED, PROMOTED; }

 
/**
 * Method called upon the creation of the Standby Task.
*
 * @param topicPartition the TopicPartition of the Standby Task.
 * @param storeName the name of the store being watched by this Standby Task.
 * @param earliestOffset the earliest offset available on the Changelog topic.
 * @param startingOffset the offset from which the Standby Task starts watching.
 * @param currentEndOffset the current latest offset on the associated 
changelog partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
 * Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
 * the value of the MAX_POLL_RECORDS is set to.
*
 * This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
 * Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
 * whole.
*
 * If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
 * @param topicPartition the TopicPartition containing the values to restore
 * @param storeName the name of the store undergoing restoration
 * @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
 * @param numRestored the total number of records restored in this batch for 
this TopicPartition
 * @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
 * Method called after a Standby Task is closed, either because the task 
migrated to a new instance or because the task was promoted to an Active task.
*/
void onTaskSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
```

  was:
In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason {
MIGRATED,
PROMOTED;
}
 
/**
* Method called upon the creation of the Standby Task.
*
* @param topicPartition the TopicPartition of the Standby Task.
* @param storeName the name of the store being watched by this Standby Task.
* @param earliestOffset the earliest offset available on the Changelog topic.
* @param startingOffset the offset from which the Standby Task starts watching.
* @param currentEndOffset the current latest offset on the associated changelog 
partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
* Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
* the value of the MAX_POLL_RECORDS is set to.
*
* This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
* Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
* whole.
*
* If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
* @param topicPartition the TopicPartition containing the values to restore
* @param storeName the name of the store undergoing restoration
* @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
* @param numRestored the total number of records restored in this batch for 
this TopicPartition
* @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
* Method called after a Standby Task is closed, either because the task 
migrated to a new instance or because the task was promoted to an Active task.
*/
void onTaskSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
```


> Streams StandbyTaskUpdateListener
> ---------------------------------
>
>                 Key: KAFKA-15448
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15448
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Colt McNealy
>            Priority: Minor
>              Labels: needs-kip
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to