Hey Allen,

Thanks for your comment. I will comment inline.

On Thu, Feb 22, 2018 at 3:05 PM, Allen Wang <aw...@netflix.com.invalid>
wrote:

> Overall this is a very useful feature. With this we can finally scale keyed
> messages.
>
> +1 on the ability to remove partitions. This will greatly increase Kafka's
> scalability in cloud.
>
> For example, when there is traffic increase, we can add brokers and assign
> new partitions to the new brokers. When traffic decreases, we can mark
> these new partitions as read only and remove them afterwards, together with
> the brokers that host these partitions. This will be a light-weight
> approach to scale a Kafka cluster compared to partition reassignment where
> you will always have to move data.
>
> I have some suggestions:
>
> - The KIP described each step in detail which is great. However, it lacks
> the "why" part to explain the high level goal we want to achieve with each
> step. For example, the purpose of step 5 may be described as "Make sure
> consumers always first finish consuming all data prior to partition
> expansion to enforce message ordering".
>

Yeah I think this is useful. This is a non-trivial KIP and it is useful to
explain the motivation of each change to help reading. I will added
motivation for each change in the KIP. Please let me know if there is
anything else that can make the KIP more readable.


>
> - The rejection of produce request at partition expansion should be
> configurable because it does not matter for non-keyed messages. Same with
> the consumer behavior for step 5. This will ensure that for non-keyed
> messages, partition expansion does not add the cost of possible message
> drop on producer or message latency on the consumer.
>

Ideally we would like to avoid adding extra configs to keep the interface
simple. I think the current overhead in the producer is actually very
small. Partition expansion or deletion should happen very infrequently.
Note that our producer today needs to refresh metadata whenever there is
leadership movement, i.e. producer will receive
NotLeaderForPartitionException from the old leader and keep refreshing
metadata until it gets the new leader of the partition, which happens much
more frequently than Partition expansion or deletion. So I am not sure we
should add a config to optimize this.


>
> - Since we now allow adding partitions for keyed messages while preserving
> the message ordering on the consumer side, the default producer partitioner
> seems to be inadequate as it rehashes all keys. As part of this KIP, should
> we also include a partitioner that better handles partition changes, for
> example, with consistent hashing?
>

I am not sure I understand the problem with the default partitioner. Can
you explain a bit more why default producer partitioner is inadequate with
this KIP? And why consistent hashing can be helpful?


>
> Thanks,
> Allen
>
>
> On Thu, Feb 22, 2018 at 11:52 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Regarding deleting partitions, Gwen's point is right on. In some of the
> > usage of Kafka, the traffic can be bursty. When the traffic goes up,
> adding
> > partitions is a quick way of shifting some traffic to the newly added
> > brokers. Once the traffic goes down, the newly added brokers will be
> > reclaimed (potentially by moving replicas off those brokers). However, if
> > one can only add partitions without removing, eventually, one will hit
> the
> > limit.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Feb 21, 2018 at 12:23 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks much for your comments.
> > >
> > > On Wed, Feb 21, 2018 at 10:17 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the KIP. At the high level, this makes sense. A few
> comments
> > > > below.
> > > >
> > > > 1. It would be useful to support removing partitions as well. The
> > general
> > > > idea could be bumping the leader epoch for the remaining partitions.
> > For
> > > > the partitions to be removed, we can make them read-only and remove
> > them
> > > > after the retention time.
> > > >
> > >
> > > I think we should be able to find a way to delete partitions of an
> > existing
> > > topic. But it will also add complexity to our broker and client
> > > implementation. I am just not sure whether this feature is worth the
> > > complexity. Could you explain a bit more why user would want to delete
> > > partitions of an existing topic? Is it to handle the human error where
> a
> > > topic is created with too many partitions by mistake?
> > >
> > >
> > > >
> > > > 2. If we support removing partitions, I am not sure if it's enough to
> > > fence
> > > > off the producer using total partition number since the total
> partition
> > > > number may remain the same after adding and then removing partitions.
> > > > Perhaps we need some notion of partition epoch.
> > > >
> > > > 3. In step 5) of the Proposed Changes, I am not sure that we can
> always
> > > > rely upon position 0 for dealing with the new partitions. A consumer
> > will
> > > > start consuming the new partition when some of the existing records
> > have
> > > > been removed due to retention.
> > > >
> > >
> > >
> > > You are right. I have updated the KIP to compare the startPosition with
> > the
> > > earliest offset of the partition. If the startPosition > earliest
> offset,
> > > then the consumer can consume messages from the given partition
> directly.
> > > This should handle the case where some of the existing records have
> been
> > > removed before consumer starts consumption.
> > >
> > >
> > > >
> > > > 4. When the consumer is allowed to read messages after the partition
> > > > expansion point, a key may be moved from one consumer instance to
> > > another.
> > > > In this case, similar to consumer rebalance, it's useful to inform
> the
> > > > application about this so that the consumer can save and reload the
> per
> > > key
> > > > state. So, we need to either add some new callbacks or reuse the
> > existing
> > > > rebalance callbacks.
> > > >
> > >
> > >
> > > Good point. I will add the callback later after we discuss the need for
> > > partition deletion.
> > >
> > >
> > > >
> > > > 5. There is some subtlety in assigning partitions. Currently, the
> > > consumer
> > > > assigns partitions without needing to know the consumption offset.
> This
> > > > could mean that a particular consumer may be assigned some new
> > partitions
> > > > that are not consumable yet, which could lead to imbalanced load
> > > > temporarily. Not sure if this is super important to address though.
> > > >
> > >
> > > Personally I think it is not worth adding more complexity just to
> > optimize
> > > this scenario. This imbalance should exist only for a short period of
> > time.
> > > If it is important I can think more about how to handle it.
> > >
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Sat, Feb 10, 2018 at 3:35 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have created KIP-253: Support in-order message delivery with
> > > partition
> > > > > expansion. See
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 253%3A+Support+in-order+message+delivery+with+partition+expansion
> > > > > .
> > > > >
> > > > > This KIP provides a way to allow messages of the same key from the
> > same
> > > > > producer to be consumed in the same order they are produced even if
> > we
> > > > > expand partition of the topic.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > >
> > >
> >
>

Reply via email to