Hi Martin,

One simpler way is that we could use the messaging mechanism in Twill to
send a message from controller to AM during shutdown, after the controller
sees a special log event emitted by AM after the AM shuts down all
containers. The AM will wait for the message from controller before
completely shutting down itself (with some reasonable timeout to avoid
infinite wait).

Terence

On Fri, Jan 6, 2017 at 4:46 PM, Martin Serrano <mar...@attivio.com> wrote:

> Terence,
>
> I see your point.  I've thought a bit about this and it seems the only
> solution would be to coordinate via ZK between the controllers and the AM.
> The solution would be something like this:
>
> * Controller consumers register an ephemeral znode within under kafka
> znode to indicate they are clients
> * Controllers listen on the .../kafka/broker znode for a child named
> shuttingDown
> * When AM reaches the state where it wants to shut down, it creates the
> .../kafka/broker/shuttingDown znode and waits for there to be no registered
> controllers.  This wait would have an upper bound to prevent eternal
> waiting.
> * Once controller consumer sees the shuttingDown node, if it receives an
> emtpy messages buffer it shuts itself down.
> * Controller consumers remove their registration znode when they shut down
> * The AM shuts down the broker once all controller consumers are gone or
> it has reached its timeout
>
> This solution avoids the checkpointing load and znode use scales with the
> number of consumers which is presumably smallish.  There are no net z-ops
> outside of consumer creation and shutdown time. However, I consider this a
> complex kind of setup and this code tends to be harder to maintain since
> the logic is spread amongst different application layers.  I don't see any
> other way to ensure full reading of the kafka queues given the decoupled
> nature of the broker and client.
>
> Something else I thought of that would usually alleviate the issue but be
> much simpler would be to have an extended timeout before broker shutdown if
> the containers exit with a non-success error code (say 15s).  The shutdown
> timeouts could also be made configurable. What do you think?
>
> -Martin
>
>
>
>
> On 01/05/2017 10:53 PM, Terence Yim wrote:
>
>> Hi Martin,
>>
>> I do agree that the AM should only shutdown the Embedded Kafka server
>> once all the controllers see all the logs. However, the difficulties is in
>> how does the AM knows about it? The Twill controller is using simple Kafka
>> API instead of the higher level one (as that one involves checkpointing to
>> ZK, as we don't want running many twill apps put a heavy load on ZK). Do
>> you have any suggestions how to do that?
>>
>> Thanks,
>> Terence
>>
>> Sent from my iPhone
>>
>> On Jan 5, 2017, at 2:42 PM, Martin Serrano <mar...@attivio.com> wrote:
>>>
>>> Actually, after further investigation, I realize the server side has to
>>> be dealt with because it is shutting down the Kafka broker before all the
>>> messages are read from it.  I see that there is a 2 second delay for
>>> clients to pull what they can first.  What would folks think about an
>>> algorithm that checked the topic for unread messages and had a longer
>>> timeout (say 30s) as long as there were messages to be received still?  Is
>>> there an issue that the client may not be present on the other side and
>>> that the delay of shutting down the AM would be undesirable?
>>>
>>> -Martin
>>>
>>> On 01/05/2017 12:32 PM, Martin Serrano wrote:
>>>>
>>>> All,
>>>>
>>>> I'm encountering a situation on a fast machine where the Kafka log
>>>> aggregation topic is not empty when the system shuts down. The scenario:
>>>>
>>>>   *  log consumer consumes all messages
>>>>   * consumer sleeps (500ms) due to empty queue
>>>>   * containers exit, posting /final log messages/ about why
>>>>   * controller notices containers are down and terminates consumers.
>>>>   * consumer is interrupted from sleep and but has been canceled so it
>>>>     does not get the rest of the messages.
>>>>
>>>> This scenario can be really confusing during development because an
>>>> error may be missed (as in my case) if it falls into the /final log
>>>> messages/.  Before I file a ticket and fix this, I wanted to get some
>>>> feedback.  Looking at 
>>>> org.apache.twill.internal.kafka.client.SimpleKafkaConsumer
>>>> it seems this behavior could be intentional given this log message (line
>>>> 384):
>>>>
>>>>             LOG.debug("Unable to fetch messages on {}, kafka consumer
>>>> service shutdown is in progress.", topicPart);
>>>>
>>>> My opinion is that final messages logged by a container are likely to
>>>> be critical in diagnosing errors and that twill should do whatever it can
>>>> to forward them before shutting things down. If there is agreement on this
>>>> I'll file a ticket and fix it.  My general approach would be to indicate to
>>>> the consumer that it is in a shuttingDown state which it would use to break
>>>> from the consume loop once the message set was empty.  If this makes sense
>>>> would we need to support a timeout for the maximum amount of time to be in
>>>> this state before punting on the rest of the messages?  My instinct is no,
>>>> get them all, but given the way the code is set up now, perhaps there are
>>>> good reasons to timeout.
>>>>
>>>> Thanks,
>>>>
>>>> Martin Serrano
>>>>
>>>>
>

Reply via email to