> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax <matth...@confluent.io> wrote:
>
> Thanks for following up James.
>
>> Is this the procedure that happens during every rebalance? The reason I ask
>> is that this step:
>>>>> As long as the leader (before or after upgrade) receives at least
> one old version X Subscription it always sends version Assignment X back
> (the encoded supported version is X before the leader is upgrade and Y
> after the leader is upgraded).
>
> Yes, that would be the consequence.
>
>> This implies that the leader receives all Subscriptions before sending back
>> any responses. Is that what actually happens? Is it possible that it would
>> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then
>> later receive a Subscription X? What happens in that case? Would that
>> Subscription X then trigger another rebalance, and the whole thing starts
>> again?
>
> That sounds correct. A 'delayed' Subscription could always happen --
> even before KIP-268 -- and would trigger a new rebalance. With this
> regard, the behavior does not change. The difference is, that we would
> automatically downgrade the Assignment from Y to X again -- but the
> application would not fail (as it would before the KIP).
>
> Do you see an issue with this behavior. The idea of the design is to
> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
> upgrade from X to Y and downgrade from Y to X in the second rebalance
> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
> would upgrade to Y again.
>
Sounds good.
> Thus, as long as not all instances are on the newest version,
> upgrades/donwgrades of the exchanged rebalance metadata could happen
> multiple times. However, this should not be an issue from my understanding.
About “this should not be an issue”: this upgrade/downgrade is just about the
rebalance metadata, right? Are there other associated things that will also
have to upgrade/downgrade in sync with the rebalance metadata? For example, the
idea for this KIP originally came up during the discussion about adding
timestamps to RockDB state stores, which required updating the on-disk schema.
In the case of an updated RocksDB state store but with a downgraded rebalance
metadata... that should work, right? Because we still have updated code (which
understands the on-disk format) but that it simply gets its partition
assignments via the downgraded rebalance metadata?
Thanks,
-James
Sent from my iPhone
> Let us know what you think about it.
>
>
> -Matthias
>
>
>> On 3/20/18 11:10 PM, James Cheng wrote:
>> Sorry, I see that the VOTE started already, but I have a late question on
>> this KIP.
>>
>> In the "version probing" protocol:
>>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
>>> On startup/rolling-bounce, an instance does not know what version the
>>> leader understands and (optimistically) sends an Subscription with the
>>> latest version Y
>>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the
>>> corresponding instance that sent the newer Subscription it does not
>>> understand. The Assignment metadata only encodes both version numbers
>>> (used-version == supported-version) as leader's supported-version X.
>>> For all other instances the leader sends a regular Assignment in version X
>>> back.
>>> If an upgrade follower sends new version number Y Subscription and receives
>>> version X Assignment with "supported-version = X", it can downgrade to X
>>> (in-memory flag) and resends a new Subscription with old version X to retry
>>> joining the group. To force an immediate second rebalance, the follower
>>> does an "unsubscribe()/subscribe()/poll()" sequence.
>>> As long as the leader (before or after upgrade) receives at least one old
>>> version X Subscription it always sends version Assignment X back (the
>>> encoded supported version is X before the leader is upgrade and Y after the
>>> leader is upgraded).
>>> If an upgraded instance receives an Assigment it always checks the leaders
>>> supported-version and update its downgraded "used-version" if possible
>>
>> Is this the procedure that happens during every rebalance? The reason I ask
>> is that this step:
>>>> As long as the leader (before or after upgrade) receives at least one old
>>>> version X Subscription it always sends version Assignment X back (the
>>>> encoded supported version is X before the leader is upgrade and Y after
>>>> the leader is upgraded).
>>
>> This implies that the leader receives all Subscriptions before sending back
>> any responses. Is that what actually happens? Is it possible that it would
>> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then
>> later receive a Subscription X? What happens in that case? Would that
>> Subscription X then trigger another rebalance, and the whole thing starts
>> again?
>>
>> Thanks,
>> -James
>>
>>> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax <matth...@confluent.io> wrote:
>>>
>>> Guozhang,
>>>
>>> thanks for your comments.
>>>
>>> 2: I think my main concern is, that 1.2 would be "special" release that
>>> everybody need to use to upgrade. As an alternative, we could say that
>>> we add the config in 1.2 and keep it for 2 additional releases (1.3 and
>>> 1.4) but remove it in 1.5. This gives users more flexibility and does
>>> force not force user to upgrade to a specific version but also allows us
>>> to not carry the tech debt forever. WDYT about this? If users upgrade on
>>> an regular basis, this approach could avoid a forces update with high
>>> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some
>>> point. Thus, only if users don't upgrade for a very long time, they are
>>> forces to do 2 upgrades with an intermediate version.
>>>
>>> 4. Updated the KIP to remove the ".x" suffix
>>>
>>> 5. Updated the KIP accordingly.
>>>
>>> -Matthias
>>>
>>>> On 3/19/18 10:33 AM, Guozhang Wang wrote:
>>>> Yup :)
>>>>
>>>>> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>> bq. some snippet like ProduceRequest / ProduceRequest
>>>>>
>>>>> Did you mean ProduceRequest / Response ?
>>>>>
>>>>> Cheers
>>>>>
>>>>>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>> About 2: yeah I guess this is a subjective preference. My main concern
>>>>>> about keeping the config / handling code beyond 1.2 release is that it
>>>>> will
>>>>>> become a non-cleanable tech debt forever, as fewer and fewer users would
>>>>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
>>>>>> maintain this for nearly no one. On the other hand, I agree that this
>>>>> tech
>>>>>> debt is not too large. So if more people feel this is a good tradeoff to
>>>>>> pay for not enforcing users from older versions to upgrade twice I'm
>>>>> happen
>>>>>> to change my opinion.
>>>>>>
>>>>>> A few more minor comments:
>>>>>>
>>>>>> 4. For the values of "upgrade.from", could we simply to only major.minor?
>>>>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
>>>>>> behavior in bug fix releases we would not need to specify a bug-fix
>>>>> version
>>>>>> to distinguish ever.
>>>>>>
>>>>>> 5. Could you also present the encoding format in subscription /
>>>>> assignment
>>>>>> metadata bytes in version 2, and in future versions (i.e. which first
>>>>> bytes
>>>>>> would be kept moving forward), for readers to better understand the
>>>>>> proposal? some snippet like ProduceRequest / ProduceRequest in
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>>>>>> would be very helpful.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax <matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for your comments.
>>>>>>>
>>>>>>> 1. Because the old leader cannot decode the new Subscription it can
>>>>> only
>>>>>>> send an empty assignment back. The idea to send empty assignments to
>>>>> all
>>>>>>> members is interesting. I will try this out in an PR to see how it
>>>>>> behaves.
>>>>>>>
>>>>>>> 2. I don't see an issue with keeping config `upgrade.from` for future
>>>>>>> releases. Personally, I would prefer to not force users to do two
>>>>>>> upgrades if they want to go from pre-1.2 to post-1.2 version. Is there
>>>>> a
>>>>>>> technical argument why you want to get rid of the config? What
>>>>>>> disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
>>>>>>>
>>>>>>> Keeping the config is just a few lines of code in `StreamsConfig` as
>>>>>>> well we a single `if` statement in `StreamsPartitionAssignor` to force
>>>>> a
>>>>>>> downgrade (cf
>>>>>>> https://github.com/apache/kafka/pull/4636/files#diff-
>>>>>>> 392371c29384e33bb09ed342e7696c68R201)
>>>>>>>
>>>>>>>
>>>>>>> 3. I updated the KIP accordingly.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>> On 3/15/18 3:19 PM, Guozhang Wang wrote:
>>>>>>>> Hello Matthias, thanks for the KIP. Here are some comments:
>>>>>>>>
>>>>>>>> 1. "For all other instances the leader sends a regular Assignment in
>>>>>>>> version X back." Does that mean the leader will exclude any member of
>>>>>> the
>>>>>>>> group whose protocol version that it does not understand? For
>>>>> example,
>>>>>> if
>>>>>>>> we have A, B, C with A the leader, and B bounced with the newer
>>>>>> version.
>>>>>>> In
>>>>>>>> the first rebalance, A will only consider {A, C} for assignment while
>>>>>>>> sending empty assignment to B. And then later when B downgrades will
>>>>> it
>>>>>>>> re-assign the tasks to it again? I felt this is unnecessarily
>>>>>> increasing
>>>>>>>> the num. rebalances and the total latency. Could the leader just
>>>>> sends
>>>>>>>> empty assignment to everyone, and since upon receiving the empty
>>>>>>> assignment
>>>>>>>> each thread will not create / restore any tasks and will not clean up
>>>>>> its
>>>>>>>> local state (so that the prevCachedTasks are not lost in future
>>>>>>> rebalances)
>>>>>>>> and re-joins immediately, if users choose to bounce an instance once
>>>>> it
>>>>>>> is
>>>>>>>> in RUNNING state the total time of rolling upgrades will be reduced.
>>>>>>>>
>>>>>>>> 2. If we want to allow upgrading from 1.1- versions to any of the
>>>>>> future
>>>>>>>> versions beyond 1.2, then we'd always need to keep the special
>>>>> handling
>>>>>>>> logic for this two rolling-bounce mechanism plus a config that we
>>>>> would
>>>>>>>> never be able to deprecate; on the other hand, if the version probing
>>>>>>>> procedure is fast, I think the extra operational cost from upgrading
>>>>>> from
>>>>>>>> 1.1- to a future version, to upgrading from 1.1- to 1.2, and then
>>>>>> another
>>>>>>>> upgrade from 1.2 to a future version could be small. So depending on
>>>>>> the
>>>>>>>> experimental result of the upgrade latency, I'd suggest considering
>>>>> the
>>>>>>>> trade-off of the extra code/config needed maintaining for the special
>>>>>>>> handling.
>>>>>>>>
>>>>>>>> 3. Testing plan: could you elaborate a bit more on the actual
>>>>>>> upgrade-paths
>>>>>>>> we should test? For example, I'm thinking the following:
>>>>>>>>
>>>>>>>> a. 0.10.0 -> 1.2
>>>>>>>> b. 1.1 -> 1.2
>>>>>>>> c. 1.2 -> 1.3 (simulated v4)
>>>>>>>> d. 0.10.0 -> 1.3 (simulated v4)
>>>>>>>> e. 1.1 -> 1.3 (simulated v4)
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I want to propose KIP-268 to allow rebalance metadata version
>>>>> upgrades
>>>>>>>>> in Kafka Streams:
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>>>>>>>>>
>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>