[ https://issues.apache.org/jira/browse/KAFKA-2217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson updated KAFKA-2217: ----------------------------------- Attachment: KAFKA-2217_2015-05-26_12:57:29.patch > Refactor Client Selectable Interface for Better Concurrency Options > ------------------------------------------------------------------- > > Key: KAFKA-2217 > URL: https://issues.apache.org/jira/browse/KAFKA-2217 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Attachments: KAFKA-2217.patch, KAFKA-2217_2015-05-25_10:45:30.patch, > KAFKA-2217_2015-05-26_09:37:29.patch, KAFKA-2217_2015-05-26_12:57:29.patch > > > The current Selectable interface makes thread-safe usage without external > locking unlikely. In particular, the interface requires implementations to > store internal lists containing the results from an invocation of poll. This > makes dealing with issues such as KAFKA-2168 more difficult since it adds > state which must be synchronized. Here are the offending methods: > {code:java} > interface Selectable { > void poll(long timeout); > List<NetworkSend> completedSends(); > List<NetworkReceive> completedReceives(); > List<Integer> disconnected(); > List<Integer> connected(); > // rest excluded > } > {code} > The user is required to invoke poll, then extract the results from the > corresponding methods. In order to avoid missing events, the caller must hold > an external lock while they access the results of the poll. > Instead, we can return the results directly from poll call using a container > class. For example: > {code:java} > class PollResult { > List<NetworkSend> completedSends; > List<NetworkReceive> completedReceives; > List<Integer> disconnected; > List<Integer> connected; > } > interface Selectable { > PollResult poll(long timeout); > } > {code} > This should get us closer to a thread-safe NetworkClient, which would enable > a more granular approach to synchronizing the KafkaConsumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)