Terence,

I see your point. I've thought a bit about this and it seems the only solution would be to coordinate via ZK between the controllers and the AM. The solution would be something like this:

* Controller consumers register an ephemeral znode within under kafka znode to indicate they are clients * Controllers listen on the .../kafka/broker znode for a child named shuttingDown * When AM reaches the state where it wants to shut down, it creates the .../kafka/broker/shuttingDown znode and waits for there to be no registered controllers. This wait would have an upper bound to prevent eternal waiting. * Once controller consumer sees the shuttingDown node, if it receives an emtpy messages buffer it shuts itself down.
* Controller consumers remove their registration znode when they shut down
* The AM shuts down the broker once all controller consumers are gone or it has reached its timeout

This solution avoids the checkpointing load and znode use scales with the number of consumers which is presumably smallish. There are no net z-ops outside of consumer creation and shutdown time. However, I consider this a complex kind of setup and this code tends to be harder to maintain since the logic is spread amongst different application layers. I don't see any other way to ensure full reading of the kafka queues given the decoupled nature of the broker and client.

Something else I thought of that would usually alleviate the issue but be much simpler would be to have an extended timeout before broker shutdown if the containers exit with a non-success error code (say 15s). The shutdown timeouts could also be made configurable. What do you think?

-Martin



On 01/05/2017 10:53 PM, Terence Yim wrote:
Hi Martin,

I do agree that the AM should only shutdown the Embedded Kafka server once all 
the controllers see all the logs. However, the difficulties is in how does the 
AM knows about it? The Twill controller is using simple Kafka API instead of 
the higher level one (as that one involves checkpointing to ZK, as we don't 
want running many twill apps put a heavy load on ZK). Do you have any 
suggestions how to do that?

Thanks,
Terence

Sent from my iPhone

On Jan 5, 2017, at 2:42 PM, Martin Serrano <mar...@attivio.com> wrote:

Actually, after further investigation, I realize the server side has to be 
dealt with because it is shutting down the Kafka broker before all the messages 
are read from it.  I see that there is a 2 second delay for clients to pull 
what they can first.  What would folks think about an algorithm that checked 
the topic for unread messages and had a longer timeout (say 30s) as long as 
there were messages to be received still?  Is there an issue that the client 
may not be present on the other side and that the delay of shutting down the AM 
would be undesirable?

-Martin

On 01/05/2017 12:32 PM, Martin Serrano wrote:

All,

I'm encountering a situation on a fast machine where the Kafka log aggregation 
topic is not empty when the system shuts down. The scenario:

  *  log consumer consumes all messages
  * consumer sleeps (500ms) due to empty queue
  * containers exit, posting /final log messages/ about why
  * controller notices containers are down and terminates consumers.
  * consumer is interrupted from sleep and but has been canceled so it
    does not get the rest of the messages.

This scenario can be really confusing during development because an error may 
be missed (as in my case) if it falls into the /final log messages/.  Before I 
file a ticket and fix this, I wanted to get some feedback.  Looking at 
org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this 
behavior could be intentional given this log message (line 384):

            LOG.debug("Unable to fetch messages on {}, kafka consumer service 
shutdown is in progress.", topicPart);

My opinion is that final messages logged by a container are likely to be 
critical in diagnosing errors and that twill should do whatever it can to 
forward them before shutting things down. If there is agreement on this I'll 
file a ticket and fix it.  My general approach would be to indicate to the 
consumer that it is in a shuttingDown state which it would use to break from 
the consume loop once the message set was empty.  If this makes sense would we 
need to support a timeout for the maximum amount of time to be in this state 
before punting on the rest of the messages?  My instinct is no, get them all, 
but given the way the code is set up now, perhaps there are good reasons to 
timeout.

Thanks,

Martin Serrano


Reply via email to