Charity,

1. Nothing you do seems crazy to me. Kafka should be able to work with
auto-scaling and we should be able to fix the issues you are running
into.

There are few things you should be careful about when using the method
you described though:
1.1 Your life may be a bit simpler if you have a way of starting a new
broker with the same ID as the old one - this means it will
automatically pick up the old replicas and you won't need to
rebalance. Makes life slightly easier in some cases.
1.2 Careful not too rebalance too many partitions at once - you only
have so much bandwidth and currently Kafka will not throttle
rebalancing traffic.

2. I think your rebalance script is not rebalancing the offsets topic?
It still has a replica on broker 1002. You have two good replicas, so
you are no where near disaster, but make sure you get this working
too.

3. From the logs, it looks like something a bit more complex happened...
You started with brokers 1001,1002 and 1003. Until around 00:08 or so,
they are all gone. Then 1005,1006,1007 and 1008 show up, but they
still have no replicas. Then you get 1001, 1003, 1004 and 1005? and
then it moves to 1001, 1003, 1004 and 1009? I'm not sure I managed to
piece this together correctly (we need better log-extraction tools for
sure...), but it looks like we had plenty of opportunities for things
to go wrong :)

4. We have a known race condition where two leader elections in close
proximity get cause a consumer to accidentally get ahead.
One culprit is "auto.leader.rebalance.auto=enable" - this can trigger
leader election at bad timing (see:
https://issues.apache.org/jira/browse/KAFKA-3670) which can lead to
the loss of the last offset after consumers saw it. In cases where the
admin controls rebalances, we often turn it off. You can trigger
rebalances based on your knowledge without Kafka automatically doing
extra rebalancing.

Another culprit can be "unclean.leader.election.enable", but you don't
have that :)

A common work-around is to configure the consumer to handle "offset
out of range" exception by jumping to the last offset available in the
log. This is the behavior of the Java client, and it would have saved
your consumer here. Go client looks very low level, so I don't know
how easy it is to do that.

If I were you, I'd retest your ASG scripts without the auto leader
election - since your own scripts can / should handle that.

Hope this helps,

Gwen

On Tue, Jun 28, 2016 at 3:49 PM, Charity Majors <char...@hound.sh> wrote:
> Hi there,
>
> I just finished implementing kafka + autoscaling groups in a way that made
> sense to me.  I have a _lot_ of experience with ASGs and various storage
> types but I'm a kafka noob (about 4-5 months of using in development and
> staging and pre-launch production).
>
> It seems to be working fine from the Kafka POV but causing troubling side
> effects elsewhere that I don't understand.  I don't know enough about Kafka
> to know if my implementation is just fundamentally flawed for some reason,
> or if so how and why.
>
> My process is basically this:
>
> - Terminate a node, or increment the size of the ASG by one.  (I'm not doing
> any graceful shutdowns because I don't want to rely on graceful shutdowns,
> and I'm not attempting to act upon more than one node at a time.  Planning
> on doing a ZK lock or something later to enforce one process at a time, if I
> can work the major kinks out.)
>
> - Firstboot script, which runs on all hosts from rc.init.  (We run ASGs for
> *everything.)  It infers things like the chef role, environment, cluster
> name, etc, registers DNS, bootstraps and runs chef-client, etc.  For storage
> nodes, it formats and mounts a PIOPS volume under the right mount point, or
> just remounts the volume if it already contains data.  Etc.
>
> - Run a balancing script from firstboot on kafka nodes.  It checks to see
> how many brokers there are and what their ids are, and checks for any
> underbalanced partitions with less than 3 ISRs.  Then we generate a new
> assignment file for rebalancing partitions, and execute it.  We watch on the
> host for all the partitions to finish rebalancing, then complete.
>
> - So far so good.  I have repeatedly killed kafka nodes and had them come
> up, rebalance the cluster, and everything on the kafka side looks healthy.
> All the partitions have the correct number of ISRs, etc.
>
> But after doing this, we have repeatedly gotten into a state where consumers
> that are pulling off the kafka partitions enter a weird state where their
> last known offset is *ahead* of the last known offset for that partition,
> and we can't recover from it.
>
> A example.  Last night I terminated ... I think it was broker 1002 or 1005,
> and it came back up as broker 1009.  It rebalanced on boot, everything
> looked good from the kafka side.  This morning we noticed that the storage
> node that maps to partition 5 has been broken for like 22 hours, it thinks
> the next offset is too far ahead / out of bounds so stopped consuming.  This
> happened shortly after broker 1009 came online and the consumer caught up.
>
> From the storage node log:
>
> time="2016-06-28T21:51:48.286035635Z" level=info msg="Serving at
> 0.0.0.0:8089..."
> time="2016-06-28T21:51:48.293946529Z" level=error msg="Error creating
> consumer" error="kafka server: The requested offset is outside the range of
> offsets maintained by the server for the given topic/partition."
> time="2016-06-28T21:51:48.294532365Z" level=error msg="Failed to start
> services: kafka server: The requested offset is outside the range of offsets
> maintained by the server for the given topic/partition."
> time="2016-06-28T21:51:48.29461156Z" level=info msg="Shutting down..."
>
> From the mysql mapping of partitions to storage nodes/statuses:
>
> PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$ hound-kennel
>
> Listing by default. Use -action <listkafka, nextoffset, watchlive, setstate,
> addslot, removeslot, removenode> for other actions
>
> Part    Status          Last Updated                    Hostname
> 0       live            2016-06-28 22:29:10 +0000 UTC   retriever-772045ec
> 1       live            2016-06-28 22:29:29 +0000 UTC   retriever-75e0e4f2
> 2       live            2016-06-28 22:29:25 +0000 UTC   retriever-78804480
> 3       live            2016-06-28 22:30:01 +0000 UTC   retriever-c0da5f85
> 4       live            2016-06-28 22:29:42 +0000 UTC   retriever-122c6d8e
> 5                       2016-06-28 21:53:48 +0000 UTC
>
>
> PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$ hound-kennel
> -partition 5 -action nextoffset
>
> Next offset for partition 5: 12040353
>
>
> Interestingly, the primary for partition 5 is 1004, and its follower is the
> new node 1009.  (Partition 2 has 1009 as its leader and 1004 as its
> follower, and seems just fine.)
>
> I've attached all the kafka logs for the broker 1009 node since it launched
> yesterday.
>
> I guess my main question is: Is there something I am fundamentally missing
> about the kafka model that makes it it not play well with autoscaling?  I
> see a couple of other people on the internet talking about using ASGs with
> kafka, but always in the context of maintaining a list of broker ids and
> reusing them.
>
> I don't want to do that.  I want the path for hardware termination,
> expanding the ASG size, and rolling entire ASGs to pick up new AMIs to all
> be the same.  I want all of these actions to be completely trivial and no
> big deal.  Is there something I'm missing, does anyone know why this is
> causing problems?
>
> Thanks so much for any help or insight anyone can provide,
> charity.
>
>
> P.S., some additional details about our kafka/consumer configuration:
>
> - We autogenerate/autoincrement broker ids from zk
>
> - We have one topic, with "many" partitions depending on the env, and a
> replication factor of 2 (now bumping to 3)
>
> - We have our own in-house written storage layer ("retriever") which
> consumes Kafka partitions.  The mapping of partitions to storage nodes is
> stored in mysql, as well as last known offset and some other details.
> Partitions currently have a 1-1 mapping with storage nodes, e.g. partition 5
> => retriever-112c6d8d storage node.
>
> - We are using the golang serama client, with the __consumer_offset internal
> partition.  This also seems to have weird problems.  It does not rebalance
> the way the docs say it is supposed to, when consumers are added or
> restarted.  (In fact I haven't been able to figure out how to get it to
> rebalance or how to change the replication factor ... but I haven't really
> dived into this one and tried to debug it yet, I've been deep in the ASG
> stuff.)  But looking at this next, it seems very likely related in some way
> because the __consumer_offsets topic seems to break at the same time.
> `kafkacat` and `kafka-topics --describe output` in the gist below:
>
> https://gist.github.com/charity/d83f25b5e3f4994eb202f35fae74e7d1
>
> as you can see, even though 2/3 of the __consumer_offsets replicas are
> online, it thinks none of them are available.  despite the fact that 5 of 6
> consumers are happily consuming away.
>

Reply via email to