[ https://issues.apache.org/jira/browse/KAFKA-170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140242#comment-13140242 ]
Taylor Gautier edited comment on KAFKA-170 at 10/31/11 4:00 PM: ---------------------------------------------------------------- Hi Jun. My approach works like this: 1) There is a low-level API that was created by Marcus Westin. The low-level API contains a Client.js (the reader) and Producer.js (the writer). Client.js contains the following APIs: - connect(args) - fetchTopic(topicName, callback) - fetchOffsets(topicName, arg, callback) 2) I created the Consumer.js implementation which extends the low-level Client.js and provides 2 additional APIs: - subscribeTopic(topicName, callback) - unsubscribeTopic(topicName, callback) It is simply the job of the Consumer.js to keep track of the topics that the caller has so far subscribed to, and whenever it polls for messages, it simply writes a request using the low-level API for every subscribed topic. So, if for example the caller has subscribed to A, B, C, then the Consumer will call the low-level API and request A, B, C. Note that order is not defined, but is important for the retreival. Next, Kafka will write responses for A, B, C, and the messages that arrive will then each be individually returned to the caller. The callback allows for its args to contain a topic name, so the callback that is passed can be the same callback for all topics. Once that is done, there is a configurable timeout. If that timeout has passed, then the Consumer simply sends a poll request for all the subscribed topics again. If the timeout has not passed, the Consumer waits for that timeout to pass and then polls for all subscribed topics. was (Author: tgautier): Hi Jun. My approach works like this: 1) There is a low-level API that was created by Marcus Westin. The low-level API contains a Client.js (the reader) and Producer.js (the writer). Client.js contains the following APIs: - connect(args) - fetchTopic(topicName, callback) - fetchOffsets(topicName, arg, callback) 2) I created the Consumer.js implementation which extends the low-level Client.js and provides 2 additional APIs: - subscribeTopic(topicName, callback) - unsubscribeTopic(topicName, callback) It is simply the job of the Consumer.js to keep track of the topics that the caller has so far subscribed to, and whenever it polls for messages, it simply writes a request for every subscribed topic. So, if for example the caller has subscribed to A, B, C, then the Consumer will call the low-level API and request A, B, C. Note that order is not defined, but is important for the retreival. Next, Kafka will write responses for A, B, C, and the messages that arrive will then each be individually returned to the caller. The callback allows for its args to contain a topic name, so the callback that is passed can be the same callback for all topics. Once that is done, there is a configurable timeout. If that timeout has passed, then the Consumer simply sends a poll request for all the subscribed topics again. If the timeout has not passed, the Consumer waits for that timeout to pass and then polls for all subscribed topics. > Support for non-blocking polling on multiple streams > ---------------------------------------------------- > > Key: KAFKA-170 > URL: https://issues.apache.org/jira/browse/KAFKA-170 > Project: Kafka > Issue Type: New Feature > Components: core > Affects Versions: 0.8 > Reporter: Jay Kreps > > Currently we provide a blocking iterator in the consumer. This is a good > mechanism for consuming data from a single topic, but is limited as a > mechanism for polling multiple streams. > For example if one wants to implement a non-blocking union across multiple > streams this is hard to do because calls may block indefinitely. A similar > situation arrises if trying to implement a streaming join of between two > streams. > I would propose two changes: > 1. Implement a next(timeout) interface on KafkaMessageStream. This will > easily handle some simple cases with minimal change. This handles certain > limited cases nicely and is easy to implement, but doesn't actually cover the > two cases above. > 2. Add an interface to poll streams. > I don't know the best approach for the later api, but it is important to get > it right. One option would be to add a > ConsumerConnector.drainTopics("topic1", "topic2", ...) which blocks until > there is at least one message and then returns a list of triples (topic, > partition, message). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira