I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
up on this KIP because currently we experience mirror-maker hung very
likely when a broker is down.

I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
timeout to expire the batches which are sitting in accumulator without
leader info. I did that because the situation there is essentially missing
metadata.

As a summary of what I am thinking about the timeout in new Producer:

1. Metadata timeout:
  - used in send(), blocking
  - used in accumulator to expire batches with timeout exception.
2. Linger.ms
  - Used in accumulator to ready the batch for drain
3. Request timeout
  - Used in NetworkClient to expire a batch and retry if no response is
received for a request before timeout.

So in this KIP, we only address (3). The only public interface change is a
new configuration of request timeout (and maybe change the configuration
name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

Would like to see what people think of above approach?

Jiangjie (Becket) Qin

On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote:

>Jun,
>
>I thought a little bit differently on this.
>Intuitively, I am thinking that if a partition is offline, the metadata
>for that partition should be considered not ready because we don’t know
>which broker we should send the message to. So those sends need to be
>blocked on metadata timeout.
>Another thing I’m wondering is in which scenario an offline partition will
>become online again in a short period of time and how likely it will
>occur. My understanding is that the batch timeout for batches sitting in
>accumulator should be larger than linger.ms but should not be too long
>(e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
>with batches to be aborted.
>
>That said, I do agree it is reasonable to buffer the message for some time
>so messages to other partitions can still get sent. But adding another
>expiration in addition to linger.ms - which is essentially a timeout -
>sounds a little bit confusing. Maybe we can do this, let the batch sit in
>accumulator up to linger.ms, then fail it if necessary.
>
>What do you think?
>
>Thanks,
>
>Jiangjie (Becket) Qin
>
>On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote:
>
>>Jiangjie,
>>
>>Allowing messages to be accumulated in an offline partition could be
>>useful
>>since the partition may become available before the request timeout or
>>linger time is reached. Now that we are planning to add a new timeout, it
>>would be useful to think through whether/how that applies to messages in
>>the accumulator too.
>>
>>Thanks,
>>
>>Jun
>>
>>
>>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin <j...@linkedin.com.invalid>
>>wrote:
>>
>>> Hi Harsha,
>>>
>>> Took a quick look at the patch. I think it is still a little bit
>>> different. KAFKA-1788 only handles the case where a batch sitting in
>>> accumulator for too long. The KIP is trying to solve the issue where a
>>> batch has already been drained from accumulator and sent to broker.
>>> We might be able to apply timeout on batch level to merge those two
>>>cases
>>> as Ewen suggested. But I’m not sure if it is a good idea to allow
>>>messages
>>> whose target partition is offline to sit in accumulator in the first
>>>place.
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io> wrote:
>>>
>>> >Guozhang and Jiangjie,
>>> >                 Isn’t this work being covered in
>>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
>>> >review the patch there.
>>> >Thanks,
>>> >Harsha
>>> >
>>> >
>>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
>>> >wrote:
>>> >
>>> >Thanks for the update Jiangjie,
>>> >
>>> >I think it is actually NOT expected that hardware disconnection will
>>>be
>>> >detected by the selector, but rather will only be revealed upon TCP
>>> >timeout, which could be hours.
>>> >
>>> >A couple of comments on the wiki:
>>> >
>>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the
>>> >request
>>> >timeout as implict timeout." I am not very clear what does this mean?
>>> >
>>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which should
>>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add "
>>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming:
>>>admittedly
>>> >
>>> >it will change the config names but will reduce confusions moving
>>> >forward.
>>> >
>>> >
>>> >Guozhang
>>> >
>>> >
>>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
>>><j...@linkedin.com.invalid>
>>> >
>>> >wrote:
>>> >
>>> >> Checked the code again. It seems that the disconnected channel is
>>>not
>>> >> detected by selector as expected.
>>> >>
>>> >> Currently we are depending on the
>>> >> o.a.k.common.network.Selector.disconnected set to see if we need to
>>>do
>>> >> something for a disconnected channel.
>>> >> However Selector.disconnected set is only updated when:
>>> >> 1. A write/read/connect to channel failed.
>>> >> 2. A Key is canceled
>>> >> However when a broker is down before it sends back the response, the
>>> >> client seems not be able to detect this failure.
>>> >>
>>> >> I did a simple test below:
>>> >> 1. Run a selector on one machine and an echo server on another
>>>machine.
>>> >>
>>> >> Connect a selector to an echo server
>>> >> 2. Send a message to echo server using selector, then let the
>>>selector
>>> >> poll() every 10 seconds.
>>> >> 3. After the sever received the message, unplug cable on the echo
>>> >>server.
>>> >> 4. After waiting for 45 min. The selector still did not detected the
>>> >> network failure.
>>> >> Lsof on selector machine shows that the TCP connection is still
>>> >>considered
>>> >> ESTABLISHED.
>>> >>
>>> >> I’m not sure in this case what should we expect from the
>>> >> java.nio.channels.Selector. According to the document, the selector
>>> >>does
>>> >> not verify the status of the associated channel. In my test case it
>>> >>looks
>>> >> even worse that OS did not think of the socket has been
>>>disconnected.
>>> >>
>>> >> Anyway. It seems adding the client side request timeout is
>>>necessary.
>>> >>I’ve
>>> >> updated the KIP page to clarify the problem we want to solve
>>>according
>>> >>to
>>> >> Ewen’s comments.
>>> >>
>>> >> Thanks.
>>> >>
>>> >> Jiangjie (Becket) Qin
>>> >>
>>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
>>>wrote:
>>> >>
>>> >>
>>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
>>> >><j...@linkedin.com.invalid>
>>> >> >wrote:
>>> >> >
>>> >> >> Hi Ewen, thanks for the comments. Very good points! Please see
>>> >>replies
>>> >> >> inline.
>>> >> >>
>>> >> >>
>>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
>>> >> wrote:
>>> >> >>
>>> >> >> >Jiangjie,
>>> >> >> >
>>> >> >> >Great start. I have a couple of comments.
>>> >> >> >
>>> >> >> >Under the motivation section, is it really true that the request
>>> >>will
>>> >> >> >never
>>> >> >> >be completed? Presumably if the broker goes down the connection
>>> >>will be
>>> >> >> >severed, at worst by a TCP timeout, which should clean up the
>>> >> >>connection
>>> >> >> >and any outstanding requests, right? I think the real reason we
>>> >>need a
>>> >> >> >different timeout is that the default TCP timeouts are
>>>ridiculously
>>> >>
>>> >> >>long
>>> >> >> >in
>>> >> >> >this context.
>>> >> >> Yes, when broker is completely down the request should be cleared
>>>as
>>> >>you
>>> >> >> said. The case we encountered looks like the broker was just not
>>> >> >> responding but TCP connection was still alive though.
>>> >> >>
>>> >> >
>>> >> >Ok, that makes sense.
>>> >> >
>>> >> >
>>> >> >>
>>> >> >> >
>>> >> >> >My second question is about whether this is the right level to
>>> >>tackle
>>> >> >>the
>>> >> >> >issue/what user-facing changes need to be made. A related
>>>problem
>>> >>came
>>> >> >>up
>>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where
>>>producer
>>> >> >> records
>>> >> >> >get stuck indefinitely because there's no client-side timeout.
>>>This
>>> >>KIP
>>> >> >> >wouldn't fix that problem or any problems caused by lack of
>>> >> >>connectivity
>>> >> >> >since this would only apply to in flight requests, which by
>>> >>definition
>>> >> >> >must
>>> >> >> >have been sent on an active connection.
>>> >> >> >
>>> >> >> >I suspect both types of problems probably need to be addressed
>>> >> >>separately
>>> >> >> >by introducing explicit timeouts. However, because the settings
>>> >> >>introduced
>>> >> >> >here are very much about the internal implementations of the
>>> >>clients,
>>> >> >>I'm
>>> >> >> >wondering if this even needs to be a user-facing setting,
>>> >>especially
>>> >> >>if we
>>> >> >> >have to add other timeouts anyway. For example, would a fixed,
>>> >>generous
>>> >> >> >value that's still much shorter than a TCP timeout, say 15s, be
>>> >>good
>>> >> >> >enough? If other timeouts would allow, for example, the clients
>>>to
>>> >> >> >properly
>>> >> >> >exit even if requests have not hit their timeout, then what's
>>>the
>>> >> >>benefit
>>> >> >> >of being able to configure the request-level timeout?
>>> >> >> That is a very good point. We have three places that we might be
>>> >>able to
>>> >> >> enforce timeout for a message send:
>>> >> >> 1. Before append to accumulator - handled by metadata timeout on
>>>per
>>> >>
>>> >> >> message level.
>>> >> >> 2. Batch of messages inside accumulator - no timeout mechanism
>>>now.
>>> >> >> 3. Request of batches after messages leave the accumulator - we
>>>have
>>> >>a
>>> >> >> broker side timeout but no client side timeout for now.
>>> >> >> My current proposal only address (3) but not (2).
>>> >> >> Honestly I do not have a very clear idea about what should we do
>>> >>with
>>> >> >>(2)
>>> >> >> right now. But I am with you that we should not expose too many
>>> >> >> configurations to users. What I am thinking now to handle (2) is
>>> >>when
>>> >> >>user
>>> >> >> call send, if we know that a partition is offline, we should
>>>throw
>>> >> >> exception immediately instead of putting it into accumulator.
>>>This
>>> >>would
>>> >> >> protect further memory consumption. We might also want to fail
>>>all
>>> >>the
>>> >> >> batches in the dequeue once we found a partition is offline. That
>>> >> >>said, I
>>> >> >> feel timeout might not be quite applicable to (2).
>>> >> >> Do you have any suggestion on this?
>>> >> >>
>>> >> >
>>> >> >Right, I didn't actually mean to solve 2 here, but was trying to
>>> >>figure
>>> >> >out
>>> >> >if a solution to 2 would reduce what we needed to do to address 3.
>>> >>(And
>>> >> >depending on how they are implemented, fixing 1 might also address
>>>2).
>>> >>It
>>> >> >sounds like you hit hang that I wasn't really expecting. This
>>>probably
>>> >>
>>> >> >just
>>> >> >means the KIP motivation needs to be a bit clearer about what type
>>>of
>>> >> >situation this addresses. The cause of the hang may also be
>>>relevant
>>> >>-- if
>>> >> >it was something like a deadlock then that's something that should
>>> >>just be
>>> >> >fixed, but if it's something outside our control then a timeout
>>>makes
>>> >>a
>>> >> >lot
>>> >> >more sense.
>>> >> >
>>> >> >
>>> >> >> >
>>> >> >> >I know we have a similar setting,
>>> >> >>max.in.flights.requests.per.connection,
>>> >> >> >exposed publicly (which I just discovered is missing from the
>>>new
>>> >> >>producer
>>> >> >> >configs documentation). But it looks like the new consumer is
>>>not
>>> >> >>exposing
>>> >> >> >that option, using a fixed value instead. I think we should
>>>default
>>> >>to
>>> >> >> >hiding these implementation values unless there's a strong case
>>>for
>>> >>a
>>> >> >> >scenario that requires customization.
>>> >> >> For producer, max.in.flight.requests.per.connection really
>>>matters.
>>> >>If
>>> >> >> people do not want to have reorder of messages, they have to use
>>> >> >> max.in.flight.requests.per.connection=1. On the other hand, if
>>> >> >>throughput
>>> >> >> is more of a concern, it could be set to higher. For the new
>>> >>consumer, I
>>> >> >> checked the value and I am not sure if the hard coded
>>> >> >> max.in.flight.requests.per.connection=100 is the right value.
>>> >>Without
>>> >> >>the
>>> >> >> response to the previous request, what offsets should be put into
>>> >>the
>>> >> >>next
>>> >> >> fetch request? It seems to me the value will be one natively
>>> >>regardless
>>> >> >>of
>>> >> >> the setting unless we are sending fetch request to different
>>> >>partitions,
>>> >> >> which does not look like the case.
>>> >> >> Anyway, it looks to be a separate issue orthogonal to the request
>>> >> >>timeout.
>>> >> >>
>>> >> >
>>> >> >
>>> >> >>
>>> >> >> >In other words, since the only user-facing change was the
>>>addition
>>> >>of
>>> >> >>the
>>> >> >> >setting, I'm wondering if we can avoid the KIP altogether by
>>>just
>>> >> >>choosing
>>> >> >> >a good default value for the timeout.
>>> >> >> The problem is that we have a server side request timeout exposed
>>>as
>>> >>a
>>> >> >> public configuration. We cannot set the client timeout smaller
>>>than
>>> >>that
>>> >> >> value, so a hard coded value probably won¹t work here.
>>> >> >>
>>> >> >
>>> >> >That makes sense, although it's worth keeping in mind that even if
>>>you
>>> >>use
>>> >> >"correct" values, they could still be violated due to, e.g., a GC
>>> >>pause
>>> >> >that causes the broker to process a request after it is supposed to
>>> >>have
>>> >> >expired.
>>> >> >
>>> >> >-Ewen
>>> >> >
>>> >> >
>>> >> >
>>> >> >> >
>>> >> >> >-Ewen
>>> >> >> >
>>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
>>> >> >><j...@linkedin.com.invalid>
>>> >> >> >wrote:
>>> >> >> >
>>> >> >> >> Hi,
>>> >> >> >>
>>> >> >> >> I just created a KIP to add a request timeout to NetworkClient
>>> >>for
>>> >> >>new
>>> >> >> >> Kafka clients.
>>> >> >> >>
>>> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >> >>
>>> >>
>>> >>
>>> 
>>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request
>>>+
>>> >>
>>> >> >> >>timeout+to+NetworkClient
>>> >> >> >>
>>> >> >> >> Comments and suggestions are welcome!
>>> >> >> >>
>>> >> >> >> Thanks.
>>> >> >> >>
>>> >> >> >> Jiangjie (Becket) Qin
>>> >> >> >>
>>> >> >> >>
>>> >> >> >
>>> >> >> >
>>> >> >> >--
>>> >> >> >Thanks,
>>> >> >> >Ewen
>>> >> >>
>>> >> >>
>>> >> >
>>> >> >
>>> >> >--
>>> >> >Thanks,
>>> >> >Ewen
>>> >>
>>> >>
>>> >
>>> >
>>> >--
>>> >-- Guozhang
>>>
>>>
>

Reply via email to