[
https://issues.apache.org/jira/browse/KAFKA-3092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ewen Cheslack-Postava resolved KAFKA-3092.
------------------------------------------
Resolution: Fixed
Fix Version/s: 0.9.1.0
Issue resolved by pull request 815
[https://github.com/apache/kafka/pull/815]
> Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked and Clarify Contract
> -----------------------------------------------------------------------------
>
> Key: KAFKA-3092
> URL: https://issues.apache.org/jira/browse/KAFKA-3092
> Project: Kafka
> Issue Type: Improvement
> Components: copycat
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Fix For: 0.9.1.0
>
>
> The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods
> exposed in Kafka Connect's SinkTask interface seems a little unclear and too
> closely tied to consumer semantics. From the javadoc, these APIs are used to
> open/close per-partition resources, but that would suggest that we should
> always get one call to onPartitionsAssigned() before writing any records for
> the corresponding partitions and one call to onPartitionsRevoked() when we
> have finished with them. However, the same methods on the consumer are used
> to indicate phases of the rebalance operation: onPartitionsRevoked() is
> called before the rebalance begins and onPartitionsAssigned() is called after
> it completes. In particular, the consumer does not guarantee a final call to
> onPartitionsRevoked().
> This mismatch makes the contract of these methods unclear. In fact, the
> WorkerSinkTask currently does not guarantee the initial call to
> onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead,
> the task implementation must pull the initial assignment from the
> SinkTaskContext. To make it more confusing, the call to commit offsets
> following onPartitionsRevoked() causes a flush() on a partition which had
> already been revoked. All of this makes it difficult to use this API as
> suggested in the javadocs.
> To fix this, we should clarify the behavior of these methods and consider
> renaming them to avoid confusion with the same methods in the consumer API.
> If onPartitionsAssigned() is meant for opening resources, maybe we can rename
> it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We
> can then fix the code to ensure that a typical open/close contract is
> enforced. This would also mean removing the need to pass the initial
> assignment in the SinkTaskContext. This would give the following API:
> {code}
> void open(Collection<TopicPartition> partitions);
> void close(Collection<TopicPartition> partitions);
> {code}
> We could also consider going a little further. Instead of depending on
> onPartitionsAssigned() to open resources, tasks could open partition
> resources on demand as records are received. In general, connectors will need
> some way to close partition-specific resources, but there might not be any
> need to pass the full list of partitions to close since the only open
> resources should be those that have received writes since the last rebalance.
> In this case, we just have a single method:
> {code}
> void close();
> {code}
> The downside to this is that the difference between close() and stop() then
> becomes a little unclear.
> Obviously these are not compatible changes and connectors would have to be
> updated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)