[
https://issues.apache.org/jira/browse/KAFKA-170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14008399#comment-14008399
]
Claude Mamo commented on KAFKA-170:
-----------------------------------
I've been browsing for a non-blocking simple consumer library and I came across
this issue. It so happens that I had modified the high-level consumer to use
callbacks instead of blocking queues. I don't know if using non-blocking
polling would be a better approach but here is a code example for those who are
interested:
{code:java}
val partitionSize = 4
val topicCountMap = new util.HashMap[EventHandler[String, String], Integer]()
val consumer = Consumer.create(...)
val cb = (messageHolder: MessageAndMetadata[String, String]) => {
println(messageHolder.message)
}
topicCountMap.put(new EventHandler("MyTopic", cb), partitionSize)
consumer.createMessageStreams(topicCountMap, new StringDecoder(), new
StringDecoder())
{code}
The code is part of the Kafka Web Console project and it can be found here:
https://github.com/claudemamo/kafka-web-console/tree/master/app/kafka.
> Support for non-blocking polling on multiple streams
> ----------------------------------------------------
>
> Key: KAFKA-170
> URL: https://issues.apache.org/jira/browse/KAFKA-170
> Project: Kafka
> Issue Type: Sub-task
> Components: core
> Affects Versions: 0.8.0
> Reporter: Jay Kreps
> Priority: Critical
> Labels: replication
>
> Currently we provide a blocking iterator in the consumer. This is a good
> mechanism for consuming data from a single topic, but is limited as a
> mechanism for polling multiple streams.
> For example if one wants to implement a non-blocking union across multiple
> streams this is hard to do because calls may block indefinitely. A similar
> situation arrises if trying to implement a streaming join of between two
> streams.
> I would propose two changes:
> 1. Implement a next(timeout) interface on KafkaMessageStream. This will
> easily handle some simple cases with minimal change. This handles certain
> limited cases nicely and is easy to implement, but doesn't actually cover the
> two cases above.
> 2. Add an interface to poll streams.
> I don't know the best approach for the later api, but it is important to get
> it right. One option would be to add a
> ConsumerConnector.drainTopics("topic1", "topic2", ...) which blocks until
> there is at least one message and then returns a list of triples (topic,
> partition, message).
--
This message was sent by Atlassian JIRA
(v6.2#6252)