Hi all, I've updated KIP-4 page to include all previously discussed items such as: new error codes, merged alter-topic and reassign-partitions requests, added TMR_V1.
It'd be great if we concentrate on the Errors+Wire Protocol schema and discuss any remaining issues today, since first patch will include only server-side implementation. Thanks, Andrii Biletskyi On Tue, Apr 21, 2015 at 9:46 AM, Andrii Biletskyi < andrii.bilets...@stealth.ly> wrote: > 1. Yes, this will be much easier. Okay, let's add it. > > 2, Okay. This will differ a little bit from the way currently > kafka-topics.sh handles alter-topic command, but I think it's > a reasonable restriction. > > I'll update KIP acordingly to our weekly call. > > Thanks, > Andrii Biletskyi > > On Mon, Apr 20, 2015 at 10:56 PM, Jun Rao <j...@confluent.io> wrote: > >> 1. Yes, lag is probably only going to be useful for the admin client. >> However, so is isr. It seems to me that we should get lag and isr from the >> same request. I was thinking that we can just extend TMR by changing >> replicas from an array of int to an array of (int, lag) pairs. Is that too >> complicated? >> >> 3. I was thinking that we just don't allow the cli to change more than one >> thing at a time. So, you will get an error if you want to change both >> partitions and configs. >> >> Thanks, >> >> Jun >> >> On Sun, Apr 19, 2015 at 8:22 AM, Andrii Biletskyi < >> andrii.bilets...@stealth.ly> wrote: >> >> > Jun, >> > >> > 1. Yes, seems we can add lag info to the TMR. But before that I wonder >> > whether there are other reasons we need this info except for reassign >> > partition command? As we discussed earlier the problem with poor >> > monitoring capabilities for reassign-partitions (as currently we only >> > inform >> > users Completed/In Progress per partition) may require separate >> solution. >> > We were thinking about separate Wire protocol request. And I actually >> like >> > your >> > idea about adding some sort of BrokerMetadataRequest for these purposes. >> > I actually think we can cover some other items (like rack-awareness) but >> > for >> > me it deserves a separate KIP really. >> > Also, adding Replica->Lag map per partition will make >> TopicMetadataResponse >> > very sophisticated: >> > Map[TopicName, Map[PartitionId, Map[ReplicaId, Lag]]. >> > Maybe we need to leave it for a moment and propose new request rather >> than >> > making a new step towards one "monster" request. >> > >> > 2. Yes, error per topic. >> > The only question is whether we should execute at least the very first >> > alter topic >> > command from the "duplicated" topic set or return error for all ... I >> think >> > the more >> > predictable and reasonable option for clients would be returning errors >> for >> > all >> > duplicated topics. >> > >> > 3. Hm, yes. Actually we also have "change topic config" there. But it is >> > not >> > related to such "replication" commands as increase replicas or change >> > replica >> > assignment. >> > This will make CLI implementation a bit strange: if user specifies >> increase >> > partitions and change topic config in one line - taking into account 2. >> we >> > will have >> > to create two separate alter topic requests, which were designed as >> batch >> > requests :), >> > but probably we can live with it. >> > Okay, I will think about a separate error code to cover such cases. >> > >> > 4. We will need InvalidArgumentTopic (e.g. contains prohibited chars), >> > IAPartitions, IAReplicas, IAReplicaAssignment, IATopicConfiguration. >> > A server side implementation will be a little bit messy (like dozens "if >> > this then this >> > error code") but maybe we should think about clients at the first place >> > here. >> > >> > Thanks, >> > Andrii Biletskyi >> > >> > On Fri, Apr 17, 2015 at 1:46 AM, Jun Rao <j...@confluent.io> wrote: >> > >> > > 1. For the lags, we can add a new field "lags" per partition. It will >> > > return for each replica that's not in isr, the replica id and the lag >> in >> > > messages. Also, if TMR is sent to a non-leader, the response can just >> > > include an empty array for isr and lags. >> > > >> > > 2. So, we will just return a topic level error for the duplicated >> topics, >> > > right? >> > > >> > > 3. Yes, it's true that today, one can specify both partitions and >> > > replicaAssignment in the TopicCommand. However, partitions is actually >> > > ignored. So, it will be clearer if we don't allow users to do this. >> > > >> > > 4. How many specific error codes like InvalidPartitions and >> > InvalidReplicas >> > > are needed? If it's not that many, giving out more specific error >> will be >> > > useful for non-java clients. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Wed, Apr 15, 2015 at 10:23 AM, Andrii Biletskyi < >> > > andrii.bilets...@stealth.ly> wrote: >> > > >> > > > Guys, >> > > > >> > > > Thanks for the discussion! >> > > > >> > > > Summary: >> > > > >> > > > 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata >> cache) >> > can >> > > > affect implementation? >> > > > A: We can fix this issue for the leading broker - ReplicaManager >> > (or >> > > > Partition) >> > > > component should have accurate isr list, then with leading >> > broker >> > > > having correct >> > > > info, to do a describe-topic we will need to define leading >> > > brokers >> > > > for partitions >> > > > and ask those for a correct isr list. >> > > > Also, we should consider adding lag information to TMR for >> each >> > > > follower for >> > > > partition reassignment, as Jun suggested above. >> > > > >> > > > 2. Q: What if user adds different alter commands for the same topic >> in >> > > > scope >> > > > of one batch request? >> > > > A: Because of the async nature of AlterTopicRequest it will be >> very >> > > > hard then >> > > > to "assemble" the expected (in terms of checking whether >> > request >> > > is >> > > > complete) >> > > > result if we let users do this. Also it will be very >> confusing. >> > > It >> > > > was proposed not to >> > > > let users do this (probably add new Error for such cases). >> > > > >> > > > 3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and >> > > > ReassingPartitons in which order AlterTopic fields should be >> > > > resolved? >> > > > A: This item is not clear. There was a proposal to let user >> change >> > > only >> > > > one thing at a time, e.g. specify either new Replicas, or >> > > > ReplicaAssignment. >> > > > This can be a simple solution, but it's a very strict rule. >> > E.g. >> > > > currently with >> > > > TopicCommand user can increase nr of partitions and define >> > > replica >> > > > assignment >> > > > for newly added partitions. Taking into account item 2. this >> > will >> > > > be even harder >> > > > for user to achieve this. >> > > > >> > > > 4. Q: Do we need such accurate errors returned from the server: >> > > > InvalidArgumentPartitions, >> > > > InvalidArgumentReplicas etc. >> > > > A: I started implementation to add proposed error codes and now >> I >> > > think >> > > > probably >> > > > InvalidArgumentError should be sufficient. We can do simple >> > > > validations on >> > > > the client side (e.g. AdminClient can ensure nr of >> partitions >> > > > argument is positive), >> > > > others - which can be covered only on server (probably >> invalid >> > > > topic config, >> > > > replica assignment includes dead broker etc) - will be done >> on >> > > > server, and in case >> > > > of invalid argument we will return InvalidArgumentError >> without >> > > > specifying the >> > > > concrete field. >> > > > >> > > > It'd be great if we could cover these remaining issues, looks like >> they >> > > are >> > > > minor, >> > > > at least related to specific messages, not the overall protocol. - I >> > > think >> > > > with that I can >> > > > update confluence page and update patch to reflect all discussed >> items. >> > > > This patch >> > > > will probably include Wire protocol messages and server-side code to >> > > handle >> > > > new >> > > > requests. AdminClient and cli-tool implementation can be the next >> step. >> > > > >> > > > Thanks, >> > > > Andrii Biletskyi >> > > > >> > > > On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao <j...@confluent.io> wrote: >> > > > >> > > > > Andrii, >> > > > > >> > > > > 500. I think what you suggested also sounds reasonable. Since ISR >> is >> > > only >> > > > > maintained accurately at the leader, TMR can return ISR if the >> broker >> > > is >> > > > > the leader of a partition. Otherwise, we can return an empty ISR. >> For >> > > > > partition reassignment, it would be useful to know the lag of each >> > > > > follower. Again, the leader knows this info. We can probably >> include >> > > that >> > > > > info in TMR as well. >> > > > > >> > > > > 300. I think it's probably reasonable to restrict >> AlterTopicRequest >> > to >> > > > > change only one thing at a time, i.e., either partitions, >> replicas, >> > > > replica >> > > > > assignment or config. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi < >> > > > > andrii.bilets...@stealth.ly> wrote: >> > > > > >> > > > > > Jun, >> > > > > > >> > > > > > 404. Great, thanks! >> > > > > > >> > > > > > 500. If I understand correctly KAFKA-1367 says ISR part of TMR >> may >> > > > > > be inconsistent. If so, then I believe all admin commands but >> > > > > describeTopic >> > > > > > are not affected. Let me emphasize that it's about AdminClient >> > > > > operations, >> > > > > > not about Wire Protocol requests. What I mean: >> > > > > > To verify AdminClient.createTopic we will need (consistent) >> > 'topics' >> > > > set >> > > > > > from TMR (we don't need isr) >> > > > > > To verify alterTopic - again, probably 'topics' and 'assigned >> > > > replicas' + >> > > > > > configs >> > > > > > To verify deleteTopic - only 'topics' >> > > > > > To verify preferredReplica - 'leader', 'assigned replicas' >> > > > > > To verify reassignPartitions - 'assigned replicas' ? (I'm not >> sure >> > > > about >> > > > > > this one) >> > > > > > If everything above is correct, then AdminClient.describeTopic >> is >> > the >> > > > > only >> > > > > > command under risk. We can actually workaround it - find out the >> > > leader >> > > > > > broker >> > > > > > and ask TMR that leading broker to get up-to-date isr list. >> > > > > > Bottom line: looks like 1367 is a separate issue, and is not a >> > > blocker >> > > > > for >> > > > > > this >> > > > > > KIP. I'm a bit concerned about adding new requests as a >> must-have >> > > part >> > > > > > of this KIP when we don't know what we want to include to those >> > > > requests. >> > > > > > >> > > > > > Also, I'd like to write down the new AlterTopicRequest semantics >> > (if >> > > we >> > > > > > decide >> > > > > > to include replicas there and merge it with >> > > ReassignPartitionsRequest) >> > > > > > 300. AlterTopicRequest => [TopicName Partitions Replicas >> > > > > ReplicaAssignment >> > > > > > [AddedConfigEntry] [DeletedConfig]] >> > > > > > The fields are resolved in this sequence: >> > > > > > 1. Either partition or replicas is defined: >> > > > > > ---1.1. ReplicaAssignment is not defined - generate automatic >> > replica >> > > > > > assignment >> > > > > > for newly added partitions or for replicas parameter >> > > > increased >> > > > > > ---1.2. ReplicaAssignment is defined - increase topic >> partitions if >> > > > > > 'partitions' defined, >> > > > > > reassign partitions according to ReplicaAssignment >> > > > > > 2. Neither partition nor replicas is defined: >> > > > > > ---2.1. ReplicaAssignment is defined - it's a reassign replicas >> > > request >> > > > > > ---2.2. ReplicaAssignment is not defined - just change topic >> > configs >> > > > > > 3. Config fields are handled always and independently from >> > > > > > partitions+replicas/replicaAssingment >> > > > > > A bit sophisticated, but should cover all cases. Another option >> - >> > we >> > > > can >> > > > > > say you can define either partitions+replicas or >> replicaAssignment. >> > > > > > >> > > > > > 300.5. There is also a new question related to >> AlterTopicRequest - >> > > > should >> > > > > > we >> > > > > > allow users multiple alter-topic instructions for one topic in >> one >> > > > batch? >> > > > > > I think if we go this way, user will expect we optimize and >> group >> > > > > requests >> > > > > > for one topic, but it will add a lot of burden, especially taken >> > into >> > > > > > account >> > > > > > async semantics of the AlterTopicRequest. I'd rather return some >> > > error >> > > > > > code, >> > > > > > or ignore all but first. Thoughts? >> > > > > > >> > > > > > Thanks, >> > > > > > Andrii Biletskyi >> > > > > > >> > > > > > On Mon, Apr 13, 2015 at 6:34 AM, Jun Rao <j...@confluent.io> >> wrote: >> > > > > > >> > > > > > > Andrii, >> > > > > > > >> > > > > > > 404. Jay and I chatted a bit. We agreed to leave >> > createTopicRequest >> > > > as >> > > > > > > async for now. >> > > > > > > >> > > > > > > There is another thing. >> > > > > > > >> > > > > > > 500. Currently, we have this issue (KAFKA-1367) that the ISR >> in >> > the >> > > > > > > metadata cache can be out of sync. The reason is that ISR is >> > really >> > > > > > > maintained at the leader. We can potentially add a new >> > > > > BrokerMetaRequest, >> > > > > > > which will return useful stats specific to a broker. Such >> stats >> > can >> > > > > > include >> > > > > > > (1) for each partition whose leader is on this broker, the ISR >> > and >> > > > the >> > > > > > lag >> > > > > > > (in messages) for each of the followers, (2) space used per >> > > > partition, >> > > > > > (3) >> > > > > > > remaining space per log dir (not sure how easy it is to get >> this >> > > > info). >> > > > > > If >> > > > > > > we have this new request, we can probably remove the ISR part >> > from >> > > > TMR >> > > > > > v1. >> > > > > > > Currently, the producer/consumer client don't really care >> about >> > > ISR. >> > > > > The >> > > > > > > admin client will then issue BrokerMetaRequest to find out ISR >> > and >> > > > > other >> > > > > > > stats. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > >> > > > > > > On Tue, Apr 7, 2015 at 12:10 PM, Andrii Biletskyi < >> > > > > > > andrii.bilets...@stealth.ly> wrote: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > A summary of our discussion: >> > > > > > > > >> > > > > > > > 201. Q: Cluster updates in backward compatible way. >> > > > > > > > A: Add topicConfigs map property and change >> > constructor, >> > > > this >> > > > > > > > shouldn't break Consumer/Producer since TMR is used in >> > > > NetworkClient, >> > > > > > > > not directly by Consumer/Producer. >> > > > > > > > >> > > > > > > > 300. Q: Can we merge AlterTopic and ReassignPartitions >> > requests? >> > > > > > > > A: It looks like in terms of Wire Protocol partition >> > > > > > reassignment >> > > > > > > > can >> > > > > > > > be just an application of AlterTopicRequest. On the >> AdminClient >> > > > side >> > > > > we >> > > > > > > can >> > > > > > > > split this into two separate methods, if needed. >> > > > > > > > >> > > > > > > > Some additional items that were added today: >> > > > > > > > 400. Q: Do we need ListTopicsRequest, we can use TMR for >> this >> > > > > purpose. >> > > > > > > > A: The answer depends on whether we can leverage >> > > ListTopics >> > > > > in >> > > > > > > > consumer/producer, because the only benefit of the >> ListTopics >> > is >> > > > > > > > performance >> > > > > > > > optimization, otherwise it doesn't worth it. >> > > > > > > > >> > > > > > > > 401. Q: AdminClient.topicExists - do we need it? >> > > > > > > > A: AdminClient.listTopics should be sufficient. >> > > > > > > > >> > > > > > > > 402. Review AdminClient API and use separate objects >> instead of >> > > > > > > collections >> > > > > > > > for methods arguments / return results (e.g. >> preferredReplica >> > > > accepts >> > > > > > > > Map<String, List<Int>> >> > > > > > > > might be better to add separate java object) >> > > > > > > > >> > > > > > > > 403. Error number in KIP-4 (100x). Currently there are no >> > > dedicated >> > > > > > > ranges >> > > > > > > > for errors, we will probably continue doing it this way. >> > > > > > > > >> > > > > > > > 404. There were some concerns again about the asynchronous >> > > > semantics >> > > > > > > > of the admin requests. Jun and Jay to agree separately how >> we >> > > want >> > > > > > > > to handle it. >> > > > > > > > >> > > > > > > > Please add / correct me if I missed something. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Andrii Biletskyi >> > > > > > > > >> > > > > > > > On Tue, Apr 7, 2015 at 4:11 PM, Andrii Biletskyi < >> > > > > > > > andrii.bilets...@stealth.ly> wrote: >> > > > > > > > >> > > > > > > > > Hi all, >> > > > > > > > > >> > > > > > > > > I wasn't able to send email to our thread (it says we >> > exceeded >> > > > > > message >> > > > > > > > > size limit :)). >> > > > > > > > > So I'm starting the new one. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > Jun, >> > > > > > > > > >> > > > > > > > > Thanks again for the review. Answering your comments: >> > > > > > > > > >> > > > > > > > > 201. I'm not sure I understand how can we evolve Cluster >> in >> > > > > backward >> > > > > > > > > compatible way. In my understanding topic configs are not >> > > > returned >> > > > > > > > > currently - >> > > > > > > > > in TMR_V0. Thus we need to add new property in Cluster - >> smth >> > > > like >> > > > > > > > > private final Map<String, List<ConfigEntry>> topicConfigs; >> > > > > > > > > Which affects Cluster constructor, which is used in >> > > > > > > MetadataResponse.java >> > > > > > > > > - not sure whether we can change Cluster this way so it's >> > > > backward >> > > > > > > > > compatible, >> > > > > > > > > I suppose - no. >> > > > > > > > > Let me know if I'm missing something... >> > > > > > > > > >> > > > > > > > > 300. Hm, so you propose to give up ReassignPartition as a >> > > > separate >> > > > > > > > command? >> > > > > > > > > That's interesting, let's discuss it today in detail. >> > > > > > > > > Two small points here: 1) afaik currently >> replica-assignment >> > > > > argument >> > > > > > > in >> > > > > > > > > alter-topic >> > > > > > > > > (from TopicCommand) doesn't reassign partitions, it lets >> > users >> > > > > > specify >> > > > > > > > > replica >> > > > > > > > > assignment for newly added partition >> (AddPartitionsListener) >> > 2) >> > > > > > > > > ReassignPartitions >> > > > > > > > > command involves a little bit more than just changing >> replica >> > > > > > > assignment >> > > > > > > > > in zk. >> > > > > > > > > People are struggling with partition reassignment so I >> think >> > > it's >> > > > > > good >> > > > > > > to >> > > > > > > > > have explicit >> > > > > > > > > request for it so we can handle it independently, also as >> > > > mentioned >> > > > > > > > > earlier we'll >> > > > > > > > > probably add in future some better status check procedure >> for >> > > > this >> > > > > > > > > long-running >> > > > > > > > > request. >> > > > > > > > > >> > > > > > > > > 301. Good point. We also agreed to use clientId as an >> > > identifier >> > > > > for >> > > > > > > the >> > > > > > > > > requester - >> > > > > > > > > whether it's a producer client or admin. I think we can go >> > with >> > > > > -1/-1 >> > > > > > > > > approach. >> > > > > > > > > >> > > > > > > > > 302. Again, as said above replica-assignment in >> alter-topic >> > > > doesn't >> > > > > > > > change >> > > > > > > > > replica assignment of the existing partitions. But we can >> > think >> > > > > about >> > > > > > > it >> > > > > > > > > in general - >> > > > > > > > > how can we change topic replication factor? The easy way >> - we >> > > > don't >> > > > > > > need >> > > > > > > > > it, >> > > > > > > > > we can use reassign partitions. Not sure whether we want >> to >> > add >> > > > > > special >> > > > > > > > > logic >> > > > > > > > > to treat this case... >> > > > > > > > > >> > > > > > > > > 303.1. Okay, sure, I'll generalize topicExists(). >> > > > > > > > > 303.2. I think, yes, we need separate verify methods as a >> > > status >> > > > > > check >> > > > > > > > > procedure, >> > > > > > > > > because respective requests are long running, and CLI user >> > > > > > potentially >> > > > > > > > > will >> > > > > > > > > asynchronously call reassign-partitions, do other stuff >> (e.g. >> > > > > create >> > > > > > > > > topics) periodically >> > > > > > > > > checking status of the partition reassignment. Anyway >> we'll >> > > have >> > > > to >> > > > > > > > > implement this logic >> > > > > > > > > because it's a criterion of the completed Future of the >> > > reassign >> > > > > > > > > partitions async >> > > > > > > > > call, we'll to have make those methods just public. >> > > > > > > > > 303.3. If preferredReplica returns Future<Map<String, >> > Errors>> >> > > > than >> > > > > > > what >> > > > > > > > > is an error >> > > > > > > > > in terms of preferred replica leader election? As I >> > understand >> > > we >> > > > > can >> > > > > > > > only >> > > > > > > > > check >> > > > > > > > > whether it has succeeded (leader == AR.head) or not >> _yet_. >> > > > > > > > > >> > > > > > > > > 304.1. Sure, let's add timeout to reassign/preferred >> replica. >> > > > > > > > > 304.2. This can be finalized after we discuss 300. >> > > > > > > > > >> > > > > > > > > 305. Misprints - thanks, fixed. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Andrii Biletskyi >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >