Agreed we also need to change in the code of Sender.java to indicate that
it resembles REPLICATION_TIMEOUT and not the request Timeout.

Thanks,

Mayuresh

On Thu, Apr 16, 2015 at 1:08 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Guozhang,
>
> By implicit timeout for close() and flush(), I meant that currently we
> don’t have a explicit timeout for close() or flush() when a broker is
> down, so they can take pretty long up to TCP timeout which is hours as you
> mentioned. With the client side request timeout, the waiting time would be
> sort of bounded by request timeout.
>
> And I agree we’d better change the TIMEOUT_CONFIG to
> REPLICATION_TIMEOUT_CONFIG to avoid confusion.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 4/15/15, 10:38 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
>
> >Thanks for the update Jiangjie,
> >
> >I think it is actually NOT expected that hardware disconnection will be
> >detected by the selector, but rather will only be revealed upon TCP
> >timeout, which could be hours.
> >
> >A couple of comments on the wiki:
> >
> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the
> >request
> >timeout as implict timeout." I am not very clear what does this mean?
> >
> >2. Currently the producer already has a "TIMEOUT_CONFIG" which should
> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add "
> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: admittedly
> >it will change the config names but will reduce confusions moving forward.
> >
> >
> >Guozhang
> >
> >
> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> >wrote:
> >
> >> Checked the code again. It seems that the disconnected channel is not
> >> detected by selector as expected.
> >>
> >> Currently we are depending on the
> >> o.a.k.common.network.Selector.disconnected set to see if we need to do
> >> something for a disconnected channel.
> >> However Selector.disconnected set is only updated when:
> >> 1. A write/read/connect to channel failed.
> >> 2. A Key is canceled
> >> However when a broker is down before it sends back the response, the
> >> client seems not be able to detect this failure.
> >>
> >> I did a simple test below:
> >> 1. Run a selector on one machine and an echo server on another machine.
> >> Connect a selector to an echo server
> >> 2. Send a message to echo server using selector, then let the selector
> >> poll() every 10 seconds.
> >> 3. After the sever received the message, unplug cable on the echo
> >>server.
> >> 4. After waiting for 45 min. The selector still did not detected the
> >> network failure.
> >> Lsof on selector machine shows that the TCP connection is still
> >>considered
> >> ESTABLISHED.
> >>
> >> I’m not sure in this case what should we expect from the
> >> java.nio.channels.Selector. According to the document, the selector does
> >> not verify the status of the associated channel. In my test case it
> >>looks
> >> even worse that OS did not think of the socket has been disconnected.
> >>
> >> Anyway. It seems adding the client side request timeout is necessary.
> >>I’ve
> >> updated the KIP page to clarify the problem we want to solve according
> >>to
> >> Ewen’s comments.
> >>
> >> Thanks.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:
> >>
> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
> >><j...@linkedin.com.invalid>
> >> >wrote:
> >> >
> >> >> Hi Ewen, thanks for the comments. Very good points! Please see
> >>replies
> >> >> inline.
> >> >>
> >> >>
> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
> >> wrote:
> >> >>
> >> >> >Jiangjie,
> >> >> >
> >> >> >Great start. I have a couple of comments.
> >> >> >
> >> >> >Under the motivation section, is it really true that the request
> >>will
> >> >> >never
> >> >> >be completed? Presumably if the broker goes down the connection
> >>will be
> >> >> >severed, at worst by a TCP timeout, which should clean up the
> >> >>connection
> >> >> >and any outstanding requests, right? I think the real reason we
> >>need a
> >> >> >different timeout is that the default TCP timeouts are ridiculously
> >> >>long
> >> >> >in
> >> >> >this context.
> >> >> Yes, when broker is completely down the request should be cleared as
> >>you
> >> >> said. The case we encountered looks like the broker was just not
> >> >> responding but TCP connection was still alive though.
> >> >>
> >> >
> >> >Ok, that makes sense.
> >> >
> >> >
> >> >>
> >> >> >
> >> >> >My second question is about whether this is the right level to
> >>tackle
> >> >>the
> >> >> >issue/what user-facing changes need to be made. A related problem
> >>came
> >> >>up
> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
> >> >> records
> >> >> >get stuck indefinitely because there's no client-side timeout. This
> >>KIP
> >> >> >wouldn't fix that problem or any problems caused by lack of
> >> >>connectivity
> >> >> >since this would only apply to in flight requests, which by
> >>definition
> >> >> >must
> >> >> >have been sent on an active connection.
> >> >> >
> >> >> >I suspect both types of problems probably need to be addressed
> >> >>separately
> >> >> >by introducing explicit timeouts. However, because the settings
> >> >>introduced
> >> >> >here are very much about the internal implementations of the
> >>clients,
> >> >>I'm
> >> >> >wondering if this even needs to be a user-facing setting, especially
> >> >>if we
> >> >> >have to add other timeouts anyway. For example, would a fixed,
> >>generous
> >> >> >value that's still much shorter than a TCP timeout, say 15s, be good
> >> >> >enough? If other timeouts would allow, for example, the clients to
> >> >> >properly
> >> >> >exit even if requests have not hit their timeout, then what's the
> >> >>benefit
> >> >> >of being able to configure the request-level timeout?
> >> >> That is a very good point. We have three places that we might be
> >>able to
> >> >> enforce timeout for a message send:
> >> >> 1. Before append to accumulator - handled by metadata timeout on per
> >> >> message level.
> >> >> 2. Batch of messages inside accumulator - no timeout mechanism now.
> >> >> 3. Request of batches after messages leave the accumulator - we have
> >>a
> >> >> broker side timeout but no client side timeout for now.
> >> >> My current proposal only address (3) but not (2).
> >> >> Honestly I do not have a very clear idea about what should we do with
> >> >>(2)
> >> >> right now. But I am with you that we should not expose too many
> >> >> configurations to users. What I am thinking now to handle (2) is when
> >> >>user
> >> >> call send, if we know that a partition is offline, we should throw
> >> >> exception immediately instead of putting it into accumulator. This
> >>would
> >> >> protect further memory consumption. We might also want to fail all
> >>the
> >> >> batches in the dequeue once we found a partition is offline.  That
> >> >>said, I
> >> >> feel timeout might not be quite applicable to (2).
> >> >> Do you have any suggestion on this?
> >> >>
> >> >
> >> >Right, I didn't actually mean to solve 2 here, but was trying to figure
> >> >out
> >> >if a solution to 2 would reduce what we needed to do to address 3. (And
> >> >depending on how they are implemented, fixing 1 might also address 2).
> >>It
> >> >sounds like you hit hang that I wasn't really expecting. This probably
> >> >just
> >> >means the KIP motivation needs to be a bit clearer about what type of
> >> >situation this addresses. The cause of the hang may also be relevant
> >>-- if
> >> >it was something like a deadlock then that's something that should
> >>just be
> >> >fixed, but if it's something outside our control then a timeout makes a
> >> >lot
> >> >more sense.
> >> >
> >> >
> >> >> >
> >> >> >I know we have a similar setting,
> >> >>max.in.flights.requests.per.connection,
> >> >> >exposed publicly (which I just discovered is missing from the new
> >> >>producer
> >> >> >configs documentation). But it looks like the new consumer is not
> >> >>exposing
> >> >> >that option, using a fixed value instead. I think we should default
> >>to
> >> >> >hiding these implementation values unless there's a strong case for
> >>a
> >> >> >scenario that requires customization.
> >> >> For producer, max.in.flight.requests.per.connection really matters.
> >>If
> >> >> people do not want to have reorder of messages, they have to use
> >> >> max.in.flight.requests.per.connection=1. On the other hand, if
> >> >>throughput
> >> >> is more of a concern, it could be set to higher. For the new
> >>consumer, I
> >> >> checked the value and I am not sure if the hard coded
> >> >> max.in.flight.requests.per.connection=100 is the right value. Without
> >> >>the
> >> >> response to the previous request, what offsets should be put into the
> >> >>next
> >> >> fetch request? It seems to me the value will be one natively
> >>regardless
> >> >>of
> >> >> the setting unless we are sending fetch request to different
> >>partitions,
> >> >> which does not look like the case.
> >> >> Anyway, it looks to be a separate issue orthogonal to the request
> >> >>timeout.
> >> >>
> >> >
> >> >
> >> >>
> >> >> >In other words, since the only user-facing change was the addition
> >>of
> >> >>the
> >> >> >setting, I'm wondering if we can avoid the KIP altogether by just
> >> >>choosing
> >> >> >a good default value for the timeout.
> >> >> The problem is that we have a server side request timeout exposed as
> >>a
> >> >> public configuration. We cannot set the client timeout smaller than
> >>that
> >> >> value, so a hard coded value probably won¹t work here.
> >> >>
> >> >
> >> >That makes sense, although it's worth keeping in mind that even if you
> >>use
> >> >"correct" values, they could still be violated due to, e.g., a  GC
> >>pause
> >> >that causes the broker to process a request after it is supposed to
> >>have
> >> >expired.
> >> >
> >> >-Ewen
> >> >
> >> >
> >> >
> >> >> >
> >> >> >-Ewen
> >> >> >
> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
> >> >><j...@linkedin.com.invalid>
> >> >> >wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>
> >> >> >> I just created a KIP to add a request timeout to NetworkClient for
> >> >>new
> >> >> >> Kafka clients.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+
> >> >> >>timeout+to+NetworkClient
> >> >> >>
> >> >> >> Comments and suggestions are welcome!
> >> >> >>
> >> >> >> Thanks.
> >> >> >>
> >> >> >> Jiangjie (Becket) Qin
> >> >> >>
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Thanks,
> >> >> >Ewen
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >Thanks,
> >> >Ewen
> >>
> >>
> >
> >
> >--
> >-- Guozhang
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to