Terence,
I see your point. I've thought a bit about this and it seems the only
solution would be to coordinate via ZK between the controllers and the
AM. The solution would be something like this:
* Controller consumers register an ephemeral znode within under kafka
znode to indicate they are clients
* Controllers listen on the .../kafka/broker znode for a child named
shuttingDown
* When AM reaches the state where it wants to shut down, it creates the
.../kafka/broker/shuttingDown znode and waits for there to be no
registered controllers. This wait would have an upper bound to prevent
eternal waiting.
* Once controller consumer sees the shuttingDown node, if it receives an
emtpy messages buffer it shuts itself down.
* Controller consumers remove their registration znode when they shut down
* The AM shuts down the broker once all controller consumers are gone or
it has reached its timeout
This solution avoids the checkpointing load and znode use scales with
the number of consumers which is presumably smallish. There are no net
z-ops outside of consumer creation and shutdown time. However, I
consider this a complex kind of setup and this code tends to be harder
to maintain since the logic is spread amongst different application
layers. I don't see any other way to ensure full reading of the kafka
queues given the decoupled nature of the broker and client.
Something else I thought of that would usually alleviate the issue but
be much simpler would be to have an extended timeout before broker
shutdown if the containers exit with a non-success error code (say
15s). The shutdown timeouts could also be made configurable. What do
you think?
-Martin
On 01/05/2017 10:53 PM, Terence Yim wrote:
Hi Martin,
I do agree that the AM should only shutdown the Embedded Kafka server once all
the controllers see all the logs. However, the difficulties is in how does the
AM knows about it? The Twill controller is using simple Kafka API instead of
the higher level one (as that one involves checkpointing to ZK, as we don't
want running many twill apps put a heavy load on ZK). Do you have any
suggestions how to do that?
Thanks,
Terence
Sent from my iPhone
On Jan 5, 2017, at 2:42 PM, Martin Serrano <mar...@attivio.com> wrote:
Actually, after further investigation, I realize the server side has to be
dealt with because it is shutting down the Kafka broker before all the messages
are read from it. I see that there is a 2 second delay for clients to pull
what they can first. What would folks think about an algorithm that checked
the topic for unread messages and had a longer timeout (say 30s) as long as
there were messages to be received still? Is there an issue that the client
may not be present on the other side and that the delay of shutting down the AM
would be undesirable?
-Martin
On 01/05/2017 12:32 PM, Martin Serrano wrote:
All,
I'm encountering a situation on a fast machine where the Kafka log aggregation
topic is not empty when the system shuts down. The scenario:
* log consumer consumes all messages
* consumer sleeps (500ms) due to empty queue
* containers exit, posting /final log messages/ about why
* controller notices containers are down and terminates consumers.
* consumer is interrupted from sleep and but has been canceled so it
does not get the rest of the messages.
This scenario can be really confusing during development because an error may
be missed (as in my case) if it falls into the /final log messages/. Before I
file a ticket and fix this, I wanted to get some feedback. Looking at
org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this
behavior could be intentional given this log message (line 384):
LOG.debug("Unable to fetch messages on {}, kafka consumer service
shutdown is in progress.", topicPart);
My opinion is that final messages logged by a container are likely to be
critical in diagnosing errors and that twill should do whatever it can to
forward them before shutting things down. If there is agreement on this I'll
file a ticket and fix it. My general approach would be to indicate to the
consumer that it is in a shuttingDown state which it would use to break from
the consume loop once the message set was empty. If this makes sense would we
need to support a timeout for the maximum amount of time to be in this state
before punting on the rest of the messages? My instinct is no, get them all,
but given the way the code is set up now, perhaps there are good reasons to
timeout.
Thanks,
Martin Serrano