[ https://issues.apache.org/jira/browse/KAFKA-3092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118330#comment-15118330 ]
ASF GitHub Bot commented on KAFKA-3092: --------------------------------------- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/815 KAFKA-3092: Replace SinkTask onPartitionsAssigned/onPartitionsRevoked with open/close You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-3092 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/815.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #815 ---- commit 6d262240a8a9d01e99388e8b2f3cf2d45ff7d57d Author: Jason Gustafson <ja...@confluent.io> Date: 2016-01-22T00:45:44Z Combine WorkerSinkTask and WorkerSinkTaskThread and refactor as a Runnable commit 22c04c95bcf2e355376892afc7d2990f5e3cb02f Author: Jason Gustafson <ja...@confluent.io> Date: 2016-01-26T21:13:40Z Ensure onPartitionsRevoked obeys close semantics commit ec9611cfdb05f12a62e304433c819e46feeed321 Author: Jason Gustafson <ja...@confluent.io> Date: 2016-01-26T22:45:27Z Assignments are only initialized in onPartitionsAssigned commit dc002cc43eacade42e07e78d0b297d31bf548cad Author: Jason Gustafson <ja...@confluent.io> Date: 2016-01-27T00:09:47Z Add open/close methods to SinkTask and deprecate onPartitionsAssigned/onPartitionsRevoked ---- > Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked and Clarify Contract > ----------------------------------------------------------------------------- > > Key: KAFKA-3092 > URL: https://issues.apache.org/jira/browse/KAFKA-3092 > Project: Kafka > Issue Type: Improvement > Components: copycat > Reporter: Jason Gustafson > Assignee: Jason Gustafson > > The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods > exposed in Kafka Connect's SinkTask interface seems a little unclear and too > closely tied to consumer semantics. From the javadoc, these APIs are used to > open/close per-partition resources, but that would suggest that we should > always get one call to onPartitionsAssigned() before writing any records for > the corresponding partitions and one call to onPartitionsRevoked() when we > have finished with them. However, the same methods on the consumer are used > to indicate phases of the rebalance operation: onPartitionsRevoked() is > called before the rebalance begins and onPartitionsAssigned() is called after > it completes. In particular, the consumer does not guarantee a final call to > onPartitionsRevoked(). > This mismatch makes the contract of these methods unclear. In fact, the > WorkerSinkTask currently does not guarantee the initial call to > onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead, > the task implementation must pull the initial assignment from the > SinkTaskContext. To make it more confusing, the call to commit offsets > following onPartitionsRevoked() causes a flush() on a partition which had > already been revoked. All of this makes it difficult to use this API as > suggested in the javadocs. > To fix this, we should clarify the behavior of these methods and consider > renaming them to avoid confusion with the same methods in the consumer API. > If onPartitionsAssigned() is meant for opening resources, maybe we can rename > it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We > can then fix the code to ensure that a typical open/close contract is > enforced. This would also mean removing the need to pass the initial > assignment in the SinkTaskContext. This would give the following API: > {code} > void open(Collection<TopicPartition> partitions); > void close(Collection<TopicPartition> partitions); > {code} > We could also consider going a little further. Instead of depending on > onPartitionsAssigned() to open resources, tasks could open partition > resources on demand as records are received. In general, connectors will need > some way to close partition-specific resources, but there might not be any > need to pass the full list of partitions to close since the only open > resources should be those that have received writes since the last rebalance. > In this case, we just have a single method: > {code} > void close(); > {code} > The downside to this is that the difference between close() and stop() then > becomes a little unclear. > Obviously these are not compatible changes and connectors would have to be > updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)