specifically comparing 0.8.1 -- https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50 ``` (1 to partitionCount).map(_ => { val partitionId = buffer.getInt val offset = buffer.getLong val metadata = readShortString(buffer) (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata)) }) ```
totrunk -- https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69 ``` (1 to partitionCount).map(_ => { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { val given = buffer.getLong if (given == -1L) now else given } val metadata = readShortString(buffer) (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) }) ``` should the `timestamp` buffer read be wrapped in an api version check? Dana Powers Rdio, Inc. [email protected] rdio.com/people/dpkp/ On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira <[email protected]> wrote: > Ah, I see :) > > The readFrom function basically tries to read two extra fields if you > are on version 1: > > if (versionId == 1) { > groupGenerationId = buffer.getInt > consumerId = readShortString(buffer) > } > > The rest looks identical in version 0 and 1, and still no timestamp in > sight... > > Gwen > > On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <[email protected]> wrote: > > Hi Gwen, I am using/writing kafka-python to construct api requests and > have > > not dug too deeply into the server source code. But I believe it is > > kafka/api/OffsetCommitRequest.scala and specifically the readFrom method > > used to decode the wire protocol. > > > > -Dana > > OffsetCommitRequest has two constructors now: > > > > For version 0: > > OffsetCommitRequest(String groupId, Map<TopicPartition, > > PartitionData> offsetData) > > > > And version 1: > > OffsetCommitRequest(String groupId, int generationId, String > > consumerId, Map<TopicPartition, PartitionData> offsetData) > > > > None of them seem to require timestamps... so I'm not sure where you > > see that this is required. Can you share an example? > > > > Gwen > > > > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <[email protected]> wrote: > >> Hi Joel, > >> > >> I'm looking more closely at the OffsetCommitRequest wire protocol change > >> you mentioned below, and I cannot figure out how to explicitly > construct a > >> request with the earlier version. Should the api version be different > for > >> requests that do not include it and/or servers that do not support the > >> timestamp field? It looks like 0.8.1.1 did not include the timestamp > > field > >> and used api version 0. But 0.8.2-beta seems to now require timestamps > >> even when I explicitly encode OffsetCommitRequest api version 0 (server > >> logs a BufferUnderflowException). > >> > >> Is this the expected server behavior? Can you provide any tips on how > >> third-party clients should manage the wire-protocol change for this api > >> method (I'm working on kafka-python)? > >> > >> Thanks, > >> > >> -Dana > >> > >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <[email protected]> > wrote: > >> > >>> Yes it should be backwards compatible. So for e.g., you should be able > >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should > >>> not upgrade your clients until after the brokers have been upgraded. > >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire > >>> protocol change I'm aware of is the OffsetCommitRequest. There is a > >>> change in the OffsetCommitRequest format (KAFKA-1634) although you can > >>> explicitly construct an OffsetCommitRequest with the earlier version. > >>> > >>> Thanks, > >>> > >>> Joel > >>> > >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote: > >>> > Hi Joel, > >>> > > >>> > Thanks for all the clarifications! Just another question on this: > will > >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8? > >>> > Generally speaking, would there be any concerns with using the 0.8.2 > >>> > consumer with a 0.8.1 broker, for instance? > >>> > > >>> > Marius > >>> > > >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy <[email protected]> > > wrote: > >>> > > >>> > > Inline.. > >>> > > > >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote: > >>> > > > Hello everyone, > >>> > > > > >>> > > > I have a few questions about the current status and future of the > >>> Kafka > >>> > > > consumers. > >>> > > > > >>> > > > We have been working to adding Kafka support in Spring XD [1], > >>> currently > >>> > > > using the high level consumer via Spring Integration Kafka [2]. > We > >>> are > >>> > > > working on adding features such as: > >>> > > > - the ability to control offsets/replay topics; > >>> > > > - the ability to control partition allocation across multiple > >>> consumers; > >>> > > > > >>> > > > We are currently at version 0.8.1.1, so using the simple consumer > > is > >>> a > >>> > > > pretty straightforward choice right now. However, in the light of > > the > >>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few > > questions: > >>> > > > > >>> > > > 1) With respect to the consumer redesign for 0.9, what is the > > future > >>> of > >>> > > the > >>> > > > Simple Consumer and High Level Consumer? To my best > understanding, > >>> the > >>> > > > existing high level consumer API will be deprecated in favour of > > the > >>> new > >>> > > > consumer API. What is the future of the Simple Consumer, in this > >>> case? it > >>> > > > will continue to exist as a low-level API implementing the Kafka > >>> protocol > >>> > > > [3] and providing the building blocks for the new consumer, or > will > >>> it be > >>> > > > deprecated as well? > >>> > > > >>> > > The new consumer will subsume both use-cases (simple and > high-level). > >>> > > You can still use the old SimpleConsumer if you wish - i.e., the > wire > >>> > > protocol for fetch and other requests will still be supported. > >>> > > > >>> > > > > >>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an > > early > >>> > > > implementation of it, but since this a feature scheduled only for > >>> 0.9, > >>> > > what > >>> > > > is its status as well? Is it included only as a future reference > > and > >>> for > >>> > > > stabilizing the API? > >>> > > > >>> > > It is a WIP so you cannot really use it. > >>> > > > >>> > > > 3) Obviously, offset management is a concern if using the simple > >>> > > consumer, > >>> > > > so - wondering about the Offset Management API as well. The Kafka > >>> > > protocol > >>> > > > document specifically indicates that it will be fully functional > in > >>> 0.8.2 > >>> > > > [4] - however, a functional implementation is already available > in > >>> > > 0.8.1.1 > >>> > > > (accessible via the SimpleConsumer API but not documented in > [5]). > >>> Again, > >>> > > > trying to understand the extent of what 0.8.1.1 already supports > >>> > > > (ostensibly, the offset manager support seems to have been added > >>> only in > >>> > > > 0.8.2 - please correct me if I am wrong), and whether if it is > >>> > > recommended > >>> > > > for use in production in any form (with the caveats that > accompany > >>> the > >>> > > use > >>> > > > of ZooKeeper). > >>> > > > >>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use > > Kafka > >>> > > as the offsets storage mechanism (not zookeeper). High-level Java > >>> > > consumers can choose to store offsets in ZooKeeper instead by > setting > >>> > > offsets.storage=zookeeper > >>> > > > >>> > > However, if you are using the simple consumer and wish to store > >>> > > offsets in ZooKeeper you will need to commit to ZooKeeper directly. > >>> > > You can use ZkUtils in the kafka.utils package for this. > >>> > > > >>> > > If you wish to move to Kafka-based offsets we will be adding a new > >>> > > OffsetsClient that can be used to commit/fetch offsets to/from > Kafka. > >>> > > This is currently not listed as a blocker for 0.8.2 but I think we > >>> > > should include it. I will update that ticket. > >>> > > > >>> > > > 4) Trying to interpret the existing examples in [6] and the > > comments > >>> on > >>> > > [7] > >>> > > > - the version of the Offset Management API that exists in 0.8.1.1 > > is > >>> > > using > >>> > > > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be > >>> replaced > >>> > > by > >>> > > > Kafka, and phased out if possible. To my understanding, the > switch > >>> > > between > >>> > > > the two will be controlled by the broker configuration (along > with > >>> other > >>> > > > parameters that control the performance of offset queues. Is that > >>> > > correct? > >>> > > > >>> > > The switch is a client-side configuration. That wiki is not > >>> > > up-to-date. The most current documentation is available as a patch > in > >>> > > https://issues.apache.org/jira/browse/KAFKA-1729 > >>> > > > >>> > > > 5) Also, wondering about the timeline of 0.8.2 - according to the > >>> > > roadmaps > >>> > > > it should be released relatively shortly. Is that correct? > >>> > > > >>> > > Yes - once the blockers are ironed out. > >>> > > > >>> > > > > >>> > > > Thanks, > >>> > > > Marius > >>> > > > > >>> > > > [1] http://projects.spring.io/spring-xd/ > >>> > > > [2] https://github.com/spring-projects/spring-integration-kafka > >>> > > > [3] > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > >>> > > > [4] > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI > >>> > > > [5] > > http://kafka.apache.org/082/documentation.html#simpleconsumerapi > >>> > > > [6] > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka > >>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729 > >>> > > > >>> > > > >>> > >>> >
