[ 
https://issues.apache.org/jira/browse/KAFKA-3092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15095445#comment-15095445
 ] 

Ewen Cheslack-Postava commented on KAFKA-3092:
----------------------------------------------

Yes, those are exactly the semantics I was thinking of. And the inclusion of a 
`partitions` parameter to `close()` isn't strictly necessary and I can't think 
of a good use case for it, but I don't feel very strongly about including or 
omitting it.

> Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked and Clarify Contract
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-3092
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3092
>             Project: Kafka
>          Issue Type: Improvement
>          Components: copycat
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>
> The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods 
> exposed in Kafka Connect's SinkTask interface seems a little unclear and too 
> closely tied to consumer semantics. From the javadoc, these APIs are used to 
> open/close per-partition resources, but that would suggest that we should 
> always get one call to onPartitionsAssigned() before writing any records for 
> the corresponding partitions and one call to onPartitionsRevoked() when we 
> have finished with them. However, the same methods on the consumer are used 
> to indicate phases of the rebalance operation: onPartitionsRevoked() is 
> called before the rebalance begins and onPartitionsAssigned() is called after 
> it completes. In particular, the consumer does not guarantee a final call to 
> onPartitionsRevoked(). 
> This mismatch makes the contract of these methods unclear. In fact, the 
> WorkerSinkTask currently does not guarantee the initial call to 
> onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead, 
> the task implementation must pull the initial assignment from the 
> SinkTaskContext. To make it more confusing, the call to commit offsets 
> following onPartitionsRevoked() causes a flush() on a partition which had 
> already been revoked. All of this makes it difficult to use this API as 
> suggested in the javadocs.
> To fix this, we should clarify the behavior of these methods and consider 
> renaming them to avoid confusion with the same methods in the consumer API. 
> If onPartitionsAssigned() is meant for opening resources, maybe we can rename 
> it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We 
> can then fix the code to ensure that a typical open/close contract is 
> enforced. This would also mean removing the need to pass the initial 
> assignment in the SinkTaskContext. This would give the following API:
> {code}
> void open(Collection<TopicPartition> partitions);
> void close(Collection<TopicPartition> partitions);
> {code}
> We could also consider going a little further. Instead of depending on 
> onPartitionsAssigned() to open resources, tasks could open partition 
> resources on demand as records are received. In general, connectors will need 
> some way to close partition-specific resources, but there might not be any 
> need to pass the full list of partitions to close since the only open 
> resources should be those that have received writes since the last rebalance. 
> In this case, we just have a single method:
> {code}
> void close();
> {code}
> The downside to this is that the difference between close() and stop() then 
> becomes a little unclear.
> Obviously these are not compatible changes and connectors would have to be 
> updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to