Jiangjie,

Yes, I think using metadata timeout to expire batches in the record
accumulator makes sense.

Thanks,

Jun

On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
> up on this KIP because currently we experience mirror-maker hung very
> likely when a broker is down.
>
> I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
> timeout to expire the batches which are sitting in accumulator without
> leader info. I did that because the situation there is essentially missing
> metadata.
>
> As a summary of what I am thinking about the timeout in new Producer:
>
> 1. Metadata timeout:
>   - used in send(), blocking
>   - used in accumulator to expire batches with timeout exception.
> 2. Linger.ms
>   - Used in accumulator to ready the batch for drain
> 3. Request timeout
>   - Used in NetworkClient to expire a batch and retry if no response is
> received for a request before timeout.
>
> So in this KIP, we only address (3). The only public interface change is a
> new configuration of request timeout (and maybe change the configuration
> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
>
> Would like to see what people think of above approach?
>
> Jiangjie (Becket) Qin
>
> On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote:
>
> >Jun,
> >
> >I thought a little bit differently on this.
> >Intuitively, I am thinking that if a partition is offline, the metadata
> >for that partition should be considered not ready because we don’t know
> >which broker we should send the message to. So those sends need to be
> >blocked on metadata timeout.
> >Another thing I’m wondering is in which scenario an offline partition will
> >become online again in a short period of time and how likely it will
> >occur. My understanding is that the batch timeout for batches sitting in
> >accumulator should be larger than linger.ms but should not be too long
> >(e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
> >with batches to be aborted.
> >
> >That said, I do agree it is reasonable to buffer the message for some time
> >so messages to other partitions can still get sent. But adding another
> >expiration in addition to linger.ms - which is essentially a timeout -
> >sounds a little bit confusing. Maybe we can do this, let the batch sit in
> >accumulator up to linger.ms, then fail it if necessary.
> >
> >What do you think?
> >
> >Thanks,
> >
> >Jiangjie (Becket) Qin
> >
> >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote:
> >
> >>Jiangjie,
> >>
> >>Allowing messages to be accumulated in an offline partition could be
> >>useful
> >>since the partition may become available before the request timeout or
> >>linger time is reached. Now that we are planning to add a new timeout, it
> >>would be useful to think through whether/how that applies to messages in
> >>the accumulator too.
> >>
> >>Thanks,
> >>
> >>Jun
> >>
> >>
> >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin <j...@linkedin.com.invalid
> >
> >>wrote:
> >>
> >>> Hi Harsha,
> >>>
> >>> Took a quick look at the patch. I think it is still a little bit
> >>> different. KAFKA-1788 only handles the case where a batch sitting in
> >>> accumulator for too long. The KIP is trying to solve the issue where a
> >>> batch has already been drained from accumulator and sent to broker.
> >>> We might be able to apply timeout on batch level to merge those two
> >>>cases
> >>> as Ewen suggested. But I’m not sure if it is a good idea to allow
> >>>messages
> >>> whose target partition is offline to sit in accumulator in the first
> >>>place.
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io>
> wrote:
> >>>
> >>> >Guozhang and Jiangjie,
> >>> >                 Isn’t this work being covered in
> >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
> >>> >review the patch there.
> >>> >Thanks,
> >>> >Harsha
> >>> >
> >>> >
> >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
> >>> >wrote:
> >>> >
> >>> >Thanks for the update Jiangjie,
> >>> >
> >>> >I think it is actually NOT expected that hardware disconnection will
> >>>be
> >>> >detected by the selector, but rather will only be revealed upon TCP
> >>> >timeout, which could be hours.
> >>> >
> >>> >A couple of comments on the wiki:
> >>> >
> >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the
> >>> >request
> >>> >timeout as implict timeout." I am not very clear what does this mean?
> >>> >
> >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which should
> >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add "
> >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming:
> >>>admittedly
> >>> >
> >>> >it will change the config names but will reduce confusions moving
> >>> >forward.
> >>> >
> >>> >
> >>> >Guozhang
> >>> >
> >>> >
> >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
> >>><j...@linkedin.com.invalid>
> >>> >
> >>> >wrote:
> >>> >
> >>> >> Checked the code again. It seems that the disconnected channel is
> >>>not
> >>> >> detected by selector as expected.
> >>> >>
> >>> >> Currently we are depending on the
> >>> >> o.a.k.common.network.Selector.disconnected set to see if we need to
> >>>do
> >>> >> something for a disconnected channel.
> >>> >> However Selector.disconnected set is only updated when:
> >>> >> 1. A write/read/connect to channel failed.
> >>> >> 2. A Key is canceled
> >>> >> However when a broker is down before it sends back the response, the
> >>> >> client seems not be able to detect this failure.
> >>> >>
> >>> >> I did a simple test below:
> >>> >> 1. Run a selector on one machine and an echo server on another
> >>>machine.
> >>> >>
> >>> >> Connect a selector to an echo server
> >>> >> 2. Send a message to echo server using selector, then let the
> >>>selector
> >>> >> poll() every 10 seconds.
> >>> >> 3. After the sever received the message, unplug cable on the echo
> >>> >>server.
> >>> >> 4. After waiting for 45 min. The selector still did not detected the
> >>> >> network failure.
> >>> >> Lsof on selector machine shows that the TCP connection is still
> >>> >>considered
> >>> >> ESTABLISHED.
> >>> >>
> >>> >> I’m not sure in this case what should we expect from the
> >>> >> java.nio.channels.Selector. According to the document, the selector
> >>> >>does
> >>> >> not verify the status of the associated channel. In my test case it
> >>> >>looks
> >>> >> even worse that OS did not think of the socket has been
> >>>disconnected.
> >>> >>
> >>> >> Anyway. It seems adding the client side request timeout is
> >>>necessary.
> >>> >>I’ve
> >>> >> updated the KIP page to clarify the problem we want to solve
> >>>according
> >>> >>to
> >>> >> Ewen’s comments.
> >>> >>
> >>> >> Thanks.
> >>> >>
> >>> >> Jiangjie (Becket) Qin
> >>> >>
> >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
> >>>wrote:
> >>> >>
> >>> >>
> >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
> >>> >><j...@linkedin.com.invalid>
> >>> >> >wrote:
> >>> >> >
> >>> >> >> Hi Ewen, thanks for the comments. Very good points! Please see
> >>> >>replies
> >>> >> >> inline.
> >>> >> >>
> >>> >> >>
> >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io
> >
> >>> >> wrote:
> >>> >> >>
> >>> >> >> >Jiangjie,
> >>> >> >> >
> >>> >> >> >Great start. I have a couple of comments.
> >>> >> >> >
> >>> >> >> >Under the motivation section, is it really true that the request
> >>> >>will
> >>> >> >> >never
> >>> >> >> >be completed? Presumably if the broker goes down the connection
> >>> >>will be
> >>> >> >> >severed, at worst by a TCP timeout, which should clean up the
> >>> >> >>connection
> >>> >> >> >and any outstanding requests, right? I think the real reason we
> >>> >>need a
> >>> >> >> >different timeout is that the default TCP timeouts are
> >>>ridiculously
> >>> >>
> >>> >> >>long
> >>> >> >> >in
> >>> >> >> >this context.
> >>> >> >> Yes, when broker is completely down the request should be cleared
> >>>as
> >>> >>you
> >>> >> >> said. The case we encountered looks like the broker was just not
> >>> >> >> responding but TCP connection was still alive though.
> >>> >> >>
> >>> >> >
> >>> >> >Ok, that makes sense.
> >>> >> >
> >>> >> >
> >>> >> >>
> >>> >> >> >
> >>> >> >> >My second question is about whether this is the right level to
> >>> >>tackle
> >>> >> >>the
> >>> >> >> >issue/what user-facing changes need to be made. A related
> >>>problem
> >>> >>came
> >>> >> >>up
> >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where
> >>>producer
> >>> >> >> records
> >>> >> >> >get stuck indefinitely because there's no client-side timeout.
> >>>This
> >>> >>KIP
> >>> >> >> >wouldn't fix that problem or any problems caused by lack of
> >>> >> >>connectivity
> >>> >> >> >since this would only apply to in flight requests, which by
> >>> >>definition
> >>> >> >> >must
> >>> >> >> >have been sent on an active connection.
> >>> >> >> >
> >>> >> >> >I suspect both types of problems probably need to be addressed
> >>> >> >>separately
> >>> >> >> >by introducing explicit timeouts. However, because the settings
> >>> >> >>introduced
> >>> >> >> >here are very much about the internal implementations of the
> >>> >>clients,
> >>> >> >>I'm
> >>> >> >> >wondering if this even needs to be a user-facing setting,
> >>> >>especially
> >>> >> >>if we
> >>> >> >> >have to add other timeouts anyway. For example, would a fixed,
> >>> >>generous
> >>> >> >> >value that's still much shorter than a TCP timeout, say 15s, be
> >>> >>good
> >>> >> >> >enough? If other timeouts would allow, for example, the clients
> >>>to
> >>> >> >> >properly
> >>> >> >> >exit even if requests have not hit their timeout, then what's
> >>>the
> >>> >> >>benefit
> >>> >> >> >of being able to configure the request-level timeout?
> >>> >> >> That is a very good point. We have three places that we might be
> >>> >>able to
> >>> >> >> enforce timeout for a message send:
> >>> >> >> 1. Before append to accumulator - handled by metadata timeout on
> >>>per
> >>> >>
> >>> >> >> message level.
> >>> >> >> 2. Batch of messages inside accumulator - no timeout mechanism
> >>>now.
> >>> >> >> 3. Request of batches after messages leave the accumulator - we
> >>>have
> >>> >>a
> >>> >> >> broker side timeout but no client side timeout for now.
> >>> >> >> My current proposal only address (3) but not (2).
> >>> >> >> Honestly I do not have a very clear idea about what should we do
> >>> >>with
> >>> >> >>(2)
> >>> >> >> right now. But I am with you that we should not expose too many
> >>> >> >> configurations to users. What I am thinking now to handle (2) is
> >>> >>when
> >>> >> >>user
> >>> >> >> call send, if we know that a partition is offline, we should
> >>>throw
> >>> >> >> exception immediately instead of putting it into accumulator.
> >>>This
> >>> >>would
> >>> >> >> protect further memory consumption. We might also want to fail
> >>>all
> >>> >>the
> >>> >> >> batches in the dequeue once we found a partition is offline. That
> >>> >> >>said, I
> >>> >> >> feel timeout might not be quite applicable to (2).
> >>> >> >> Do you have any suggestion on this?
> >>> >> >>
> >>> >> >
> >>> >> >Right, I didn't actually mean to solve 2 here, but was trying to
> >>> >>figure
> >>> >> >out
> >>> >> >if a solution to 2 would reduce what we needed to do to address 3.
> >>> >>(And
> >>> >> >depending on how they are implemented, fixing 1 might also address
> >>>2).
> >>> >>It
> >>> >> >sounds like you hit hang that I wasn't really expecting. This
> >>>probably
> >>> >>
> >>> >> >just
> >>> >> >means the KIP motivation needs to be a bit clearer about what type
> >>>of
> >>> >> >situation this addresses. The cause of the hang may also be
> >>>relevant
> >>> >>-- if
> >>> >> >it was something like a deadlock then that's something that should
> >>> >>just be
> >>> >> >fixed, but if it's something outside our control then a timeout
> >>>makes
> >>> >>a
> >>> >> >lot
> >>> >> >more sense.
> >>> >> >
> >>> >> >
> >>> >> >> >
> >>> >> >> >I know we have a similar setting,
> >>> >> >>max.in.flights.requests.per.connection,
> >>> >> >> >exposed publicly (which I just discovered is missing from the
> >>>new
> >>> >> >>producer
> >>> >> >> >configs documentation). But it looks like the new consumer is
> >>>not
> >>> >> >>exposing
> >>> >> >> >that option, using a fixed value instead. I think we should
> >>>default
> >>> >>to
> >>> >> >> >hiding these implementation values unless there's a strong case
> >>>for
> >>> >>a
> >>> >> >> >scenario that requires customization.
> >>> >> >> For producer, max.in.flight.requests.per.connection really
> >>>matters.
> >>> >>If
> >>> >> >> people do not want to have reorder of messages, they have to use
> >>> >> >> max.in.flight.requests.per.connection=1. On the other hand, if
> >>> >> >>throughput
> >>> >> >> is more of a concern, it could be set to higher. For the new
> >>> >>consumer, I
> >>> >> >> checked the value and I am not sure if the hard coded
> >>> >> >> max.in.flight.requests.per.connection=100 is the right value.
> >>> >>Without
> >>> >> >>the
> >>> >> >> response to the previous request, what offsets should be put into
> >>> >>the
> >>> >> >>next
> >>> >> >> fetch request? It seems to me the value will be one natively
> >>> >>regardless
> >>> >> >>of
> >>> >> >> the setting unless we are sending fetch request to different
> >>> >>partitions,
> >>> >> >> which does not look like the case.
> >>> >> >> Anyway, it looks to be a separate issue orthogonal to the request
> >>> >> >>timeout.
> >>> >> >>
> >>> >> >
> >>> >> >
> >>> >> >>
> >>> >> >> >In other words, since the only user-facing change was the
> >>>addition
> >>> >>of
> >>> >> >>the
> >>> >> >> >setting, I'm wondering if we can avoid the KIP altogether by
> >>>just
> >>> >> >>choosing
> >>> >> >> >a good default value for the timeout.
> >>> >> >> The problem is that we have a server side request timeout exposed
> >>>as
> >>> >>a
> >>> >> >> public configuration. We cannot set the client timeout smaller
> >>>than
> >>> >>that
> >>> >> >> value, so a hard coded value probably won¹t work here.
> >>> >> >>
> >>> >> >
> >>> >> >That makes sense, although it's worth keeping in mind that even if
> >>>you
> >>> >>use
> >>> >> >"correct" values, they could still be violated due to, e.g., a GC
> >>> >>pause
> >>> >> >that causes the broker to process a request after it is supposed to
> >>> >>have
> >>> >> >expired.
> >>> >> >
> >>> >> >-Ewen
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> >> >
> >>> >> >> >-Ewen
> >>> >> >> >
> >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
> >>> >> >><j...@linkedin.com.invalid>
> >>> >> >> >wrote:
> >>> >> >> >
> >>> >> >> >> Hi,
> >>> >> >> >>
> >>> >> >> >> I just created a KIP to add a request timeout to NetworkClient
> >>> >>for
> >>> >> >>new
> >>> >> >> >> Kafka clients.
> >>> >> >> >>
> >>> >> >> >>
> >>> >> >> >>
> >>> >> >>
> >>> >> >>
> >>> >>
> >>> >>
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request
> >>>+
> >>> >>
> >>> >> >> >>timeout+to+NetworkClient
> >>> >> >> >>
> >>> >> >> >> Comments and suggestions are welcome!
> >>> >> >> >>
> >>> >> >> >> Thanks.
> >>> >> >> >>
> >>> >> >> >> Jiangjie (Becket) Qin
> >>> >> >> >>
> >>> >> >> >>
> >>> >> >> >
> >>> >> >> >
> >>> >> >> >--
> >>> >> >> >Thanks,
> >>> >> >> >Ewen
> >>> >> >>
> >>> >> >>
> >>> >> >
> >>> >> >
> >>> >> >--
> >>> >> >Thanks,
> >>> >> >Ewen
> >>> >>
> >>> >>
> >>> >
> >>> >
> >>> >--
> >>> >-- Guozhang
> >>>
> >>>
> >
>
>

Reply via email to