[
https://issues.apache.org/jira/browse/KAFKA-3092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118330#comment-15118330
]
ASF GitHub Bot commented on KAFKA-3092:
---------------------------------------
GitHub user hachikuji opened a pull request:
https://github.com/apache/kafka/pull/815
KAFKA-3092: Replace SinkTask onPartitionsAssigned/onPartitionsRevoked with
open/close
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/hachikuji/kafka KAFKA-3092
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/815.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #815
----
commit 6d262240a8a9d01e99388e8b2f3cf2d45ff7d57d
Author: Jason Gustafson <[email protected]>
Date: 2016-01-22T00:45:44Z
Combine WorkerSinkTask and WorkerSinkTaskThread and refactor as a Runnable
commit 22c04c95bcf2e355376892afc7d2990f5e3cb02f
Author: Jason Gustafson <[email protected]>
Date: 2016-01-26T21:13:40Z
Ensure onPartitionsRevoked obeys close semantics
commit ec9611cfdb05f12a62e304433c819e46feeed321
Author: Jason Gustafson <[email protected]>
Date: 2016-01-26T22:45:27Z
Assignments are only initialized in onPartitionsAssigned
commit dc002cc43eacade42e07e78d0b297d31bf548cad
Author: Jason Gustafson <[email protected]>
Date: 2016-01-27T00:09:47Z
Add open/close methods to SinkTask and deprecate
onPartitionsAssigned/onPartitionsRevoked
----
> Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked and Clarify Contract
> -----------------------------------------------------------------------------
>
> Key: KAFKA-3092
> URL: https://issues.apache.org/jira/browse/KAFKA-3092
> Project: Kafka
> Issue Type: Improvement
> Components: copycat
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
>
> The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods
> exposed in Kafka Connect's SinkTask interface seems a little unclear and too
> closely tied to consumer semantics. From the javadoc, these APIs are used to
> open/close per-partition resources, but that would suggest that we should
> always get one call to onPartitionsAssigned() before writing any records for
> the corresponding partitions and one call to onPartitionsRevoked() when we
> have finished with them. However, the same methods on the consumer are used
> to indicate phases of the rebalance operation: onPartitionsRevoked() is
> called before the rebalance begins and onPartitionsAssigned() is called after
> it completes. In particular, the consumer does not guarantee a final call to
> onPartitionsRevoked().
> This mismatch makes the contract of these methods unclear. In fact, the
> WorkerSinkTask currently does not guarantee the initial call to
> onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead,
> the task implementation must pull the initial assignment from the
> SinkTaskContext. To make it more confusing, the call to commit offsets
> following onPartitionsRevoked() causes a flush() on a partition which had
> already been revoked. All of this makes it difficult to use this API as
> suggested in the javadocs.
> To fix this, we should clarify the behavior of these methods and consider
> renaming them to avoid confusion with the same methods in the consumer API.
> If onPartitionsAssigned() is meant for opening resources, maybe we can rename
> it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We
> can then fix the code to ensure that a typical open/close contract is
> enforced. This would also mean removing the need to pass the initial
> assignment in the SinkTaskContext. This would give the following API:
> {code}
> void open(Collection<TopicPartition> partitions);
> void close(Collection<TopicPartition> partitions);
> {code}
> We could also consider going a little further. Instead of depending on
> onPartitionsAssigned() to open resources, tasks could open partition
> resources on demand as records are received. In general, connectors will need
> some way to close partition-specific resources, but there might not be any
> need to pass the full list of partitions to close since the only open
> resources should be those that have received writes since the last rebalance.
> In this case, we just have a single method:
> {code}
> void close();
> {code}
> The downside to this is that the difference between close() and stop() then
> becomes a little unclear.
> Obviously these are not compatible changes and connectors would have to be
> updated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)