Terence,

I'm going to make a ticket and move discussion to that.  Okay?

-Martin

On 01/07/2017 02:29 PM, Terence Yim wrote:
Hi Martin,

One simpler way is that we could use the messaging mechanism in Twill to
send a message from controller to AM during shutdown, after the controller
sees a special log event emitted by AM after the AM shuts down all
containers. The AM will wait for the message from controller before
completely shutting down itself (with some reasonable timeout to avoid
infinite wait).

Terence

On Fri, Jan 6, 2017 at 4:46 PM, Martin Serrano <mar...@attivio.com> wrote:

Terence,

I see your point.  I've thought a bit about this and it seems the only
solution would be to coordinate via ZK between the controllers and the AM.
The solution would be something like this:

* Controller consumers register an ephemeral znode within under kafka
znode to indicate they are clients
* Controllers listen on the .../kafka/broker znode for a child named
shuttingDown
* When AM reaches the state where it wants to shut down, it creates the
.../kafka/broker/shuttingDown znode and waits for there to be no registered
controllers.  This wait would have an upper bound to prevent eternal
waiting.
* Once controller consumer sees the shuttingDown node, if it receives an
emtpy messages buffer it shuts itself down.
* Controller consumers remove their registration znode when they shut down
* The AM shuts down the broker once all controller consumers are gone or
it has reached its timeout

This solution avoids the checkpointing load and znode use scales with the
number of consumers which is presumably smallish.  There are no net z-ops
outside of consumer creation and shutdown time. However, I consider this a
complex kind of setup and this code tends to be harder to maintain since
the logic is spread amongst different application layers.  I don't see any
other way to ensure full reading of the kafka queues given the decoupled
nature of the broker and client.

Something else I thought of that would usually alleviate the issue but be
much simpler would be to have an extended timeout before broker shutdown if
the containers exit with a non-success error code (say 15s).  The shutdown
timeouts could also be made configurable. What do you think?

-Martin




On 01/05/2017 10:53 PM, Terence Yim wrote:

Hi Martin,

I do agree that the AM should only shutdown the Embedded Kafka server
once all the controllers see all the logs. However, the difficulties is in
how does the AM knows about it? The Twill controller is using simple Kafka
API instead of the higher level one (as that one involves checkpointing to
ZK, as we don't want running many twill apps put a heavy load on ZK). Do
you have any suggestions how to do that?

Thanks,
Terence

Sent from my iPhone

On Jan 5, 2017, at 2:42 PM, Martin Serrano <mar...@attivio.com> wrote:
Actually, after further investigation, I realize the server side has to
be dealt with because it is shutting down the Kafka broker before all the
messages are read from it.  I see that there is a 2 second delay for
clients to pull what they can first.  What would folks think about an
algorithm that checked the topic for unread messages and had a longer
timeout (say 30s) as long as there were messages to be received still?  Is
there an issue that the client may not be present on the other side and
that the delay of shutting down the AM would be undesirable?

-Martin

On 01/05/2017 12:32 PM, Martin Serrano wrote:
All,

I'm encountering a situation on a fast machine where the Kafka log
aggregation topic is not empty when the system shuts down. The scenario:

   *  log consumer consumes all messages
   * consumer sleeps (500ms) due to empty queue
   * containers exit, posting /final log messages/ about why
   * controller notices containers are down and terminates consumers.
   * consumer is interrupted from sleep and but has been canceled so it
     does not get the rest of the messages.

This scenario can be really confusing during development because an
error may be missed (as in my case) if it falls into the /final log
messages/.  Before I file a ticket and fix this, I wanted to get some
feedback.  Looking at org.apache.twill.internal.kafka.client.SimpleKafkaConsumer
it seems this behavior could be intentional given this log message (line
384):

             LOG.debug("Unable to fetch messages on {}, kafka consumer
service shutdown is in progress.", topicPart);

My opinion is that final messages logged by a container are likely to
be critical in diagnosing errors and that twill should do whatever it can
to forward them before shutting things down. If there is agreement on this
I'll file a ticket and fix it.  My general approach would be to indicate to
the consumer that it is in a shuttingDown state which it would use to break
from the consume loop once the message set was empty.  If this makes sense
would we need to support a timeout for the maximum amount of time to be in
this state before punting on the rest of the messages?  My instinct is no,
get them all, but given the way the code is set up now, perhaps there are
good reasons to timeout.

Thanks,

Martin Serrano



Reply via email to