In the new API commitSync() handles retires and reconnecting, and will only throw an exception if it encounters a non-retriable error (e.g. it is been told that the partitions it wants to commit no longer belongs to itself) or a timeout has elapsed. You can find possible exceptions thrown from this function here (for function commitSync):
http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html Guozhang On Wed, Dec 2, 2015 at 8:58 AM, Krzysztof Ciesielski < [email protected]> wrote: > I see, that’s actually a very important point, thanks Jay. > I think that we are very optimistic about updating Reactive Kafka now > after getting all these details :) > I have one more question: in the new client we only have to call > commitSync(offsets). This is a ‘void’ method so I suspect that it commits > atomically? > In our current native committer, we have quite a lot of additional code > for retries, reconnecting or finding new channel coordinator. I suspect > that the new API handles it all internally and if commitSync() fails then > it means that the only thing we can do is kill the consumer and try to > create a new one? > > — > Bests, > Chris > SoftwareMill > On 2 December 2015 at 17:42:24, Jay Kreps ([email protected]) wrote: > > It's worth noting that both the old and new consumer are identical in the > number of records fetched at once and this is bounded by the fetch size and > the number of partitions you subscribe to. The old consumer held these in > memory internally and waited for you to ask for them, the new consumer > immediately gives you what it has. Overall, though, the new consumer gives > much better control over what is being fetched since it only uses memory > when you call poll(); the old consumer had a background thread doing this > which would only stop when it filled up a queue of unprocessed > chunks...this is a lot harder to predict. > > -Jay > > On Wed, Dec 2, 2015 at 7:13 AM, Gwen Shapira <[email protected]> wrote: > > > On Wed, Dec 2, 2015 at 10:44 PM, Krzysztof Ciesielski < > > [email protected]> wrote: > > > > > Hello, > > > > > > I’m the main maintainer of Reactive Kafka - a wrapper library that > > > provides Kafka API as Reactive Streams ( > > > https://github.com/softwaremill/reactive-kafka). > > > I’m a bit concerned about switching to Kafka 0.9 because of the new > > > Consumer API which doesn’t seem to fit well into this paradigm, > comparing > > > to the old one. My main concerns are: > > > > > > 1. Our current code uses the KafkaIterator and reads messages > > > sequentially, then sends them further upstream. In the new API, you > > cannot > > > control how many messages are returned with poll(), so we would need to > > > introduce some kind of in-memory buffering. > > > 2. You cannot specify which offsets to commit. Our current native > > > committer ( > > > > > > https://github.com/softwaremill/reactive-kafka/blob/4055e88c09b8e08aefe8dbbd4748605df5779b07/core/src/main/scala/com/softwaremill/react/kafka/commit/native/NativeCommitter.scala > > ) > > > uses the OffsetCommitRequest/Response API and > > > kafka.api.ConsumerMetadataRequest/Response for resolving brokers. > > Switching > > > to Kafka 0.9 brings some compilation errors that raise questions. > > > > > > My questions are: > > > > > > 1. Do I understand the capabilities and limitations of new API > correctly? > > > :) > > > > > > > The first limitation is correct - poll() may return any number of records > > and you need to handle this. > > The second is not correct - commitSync() can take a map of TopicPartition > > and Offsets, so you would only commit specific offsets of specific > > partitions. > > > > > > > > > 2. Can we stay with the old iterator-based client, or is it going to > get > > > abandoned in future Kafka versions, or discouraged for some reasons? > > > > > > > It is already a bit behind - only the new client includes support for > > secured clusters (authentication and encryption). It will get deprecated > in > > the future. > > > > > > > 3. Can we still use the OffsetCommitRequest/Response API to commit > > > messages manually? If yes, could someone update this example: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka > > or > > > give me a few hints on how to do this with 0.9? > > > > > > > AFAIK, the wire protocol and the API is not going anywhere. Hopefully you > > can use the new objects we provide in the clients jar > > (org.apache.kafka.common.requests). > > > > > > > > > > By the way, we’d like our library to appear on the Ecosystem Wiki, I’m > > not > > > sure how to request that officially :) > > > > > > > Let us know what to write there and where to link :) > > > > > > > > > > — > > > Bests, > > > Chris > > > SoftwareMill > > > -- -- Guozhang
