Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. "For KafkaProducer.close() and KafkaProducer.flush() we need the request
timeout as implict timeout." I am not very clear what does this mean?

2. Currently the producer already has a "TIMEOUT_CONFIG" which should
really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add "
REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: admittedly
it will change the config names but will reduce confusions moving forward.


Guozhang


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Checked the code again. It seems that the disconnected channel is not
> detected by selector as expected.
>
> Currently we are depending on the
> o.a.k.common.network.Selector.disconnected set to see if we need to do
> something for a disconnected channel.
> However Selector.disconnected set is only updated when:
> 1. A write/read/connect to channel failed.
> 2. A Key is canceled
> However when a broker is down before it sends back the response, the
> client seems not be able to detect this failure.
>
> I did a simple test below:
> 1. Run a selector on one machine and an echo server on another machine.
> Connect a selector to an echo server
> 2. Send a message to echo server using selector, then let the selector
> poll() every 10 seconds.
> 3. After the sever received the message, unplug cable on the echo server.
> 4. After waiting for 45 min. The selector still did not detected the
> network failure.
> Lsof on selector machine shows that the TCP connection is still considered
> ESTABLISHED.
>
> I’m not sure in this case what should we expect from the
> java.nio.channels.Selector. According to the document, the selector does
> not verify the status of the associated channel. In my test case it looks
> even worse that OS did not think of the socket has been disconnected.
>
> Anyway. It seems adding the client side request timeout is necessary. I’ve
> updated the KIP page to clarify the problem we want to solve according to
> Ewen’s comments.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:
>
> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> >wrote:
> >
> >> Hi Ewen, thanks for the comments. Very good points! Please see replies
> >> inline.
> >>
> >>
> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
> wrote:
> >>
> >> >Jiangjie,
> >> >
> >> >Great start. I have a couple of comments.
> >> >
> >> >Under the motivation section, is it really true that the request will
> >> >never
> >> >be completed? Presumably if the broker goes down the connection will be
> >> >severed, at worst by a TCP timeout, which should clean up the
> >>connection
> >> >and any outstanding requests, right? I think the real reason we need a
> >> >different timeout is that the default TCP timeouts are ridiculously
> >>long
> >> >in
> >> >this context.
> >> Yes, when broker is completely down the request should be cleared as you
> >> said. The case we encountered looks like the broker was just not
> >> responding but TCP connection was still alive though.
> >>
> >
> >Ok, that makes sense.
> >
> >
> >>
> >> >
> >> >My second question is about whether this is the right level to tackle
> >>the
> >> >issue/what user-facing changes need to be made. A related problem came
> >>up
> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
> >> records
> >> >get stuck indefinitely because there's no client-side timeout. This KIP
> >> >wouldn't fix that problem or any problems caused by lack of
> >>connectivity
> >> >since this would only apply to in flight requests, which by definition
> >> >must
> >> >have been sent on an active connection.
> >> >
> >> >I suspect both types of problems probably need to be addressed
> >>separately
> >> >by introducing explicit timeouts. However, because the settings
> >>introduced
> >> >here are very much about the internal implementations of the clients,
> >>I'm
> >> >wondering if this even needs to be a user-facing setting, especially
> >>if we
> >> >have to add other timeouts anyway. For example, would a fixed, generous
> >> >value that's still much shorter than a TCP timeout, say 15s, be good
> >> >enough? If other timeouts would allow, for example, the clients to
> >> >properly
> >> >exit even if requests have not hit their timeout, then what's the
> >>benefit
> >> >of being able to configure the request-level timeout?
> >> That is a very good point. We have three places that we might be able to
> >> enforce timeout for a message send:
> >> 1. Before append to accumulator - handled by metadata timeout on per
> >> message level.
> >> 2. Batch of messages inside accumulator - no timeout mechanism now.
> >> 3. Request of batches after messages leave the accumulator - we have a
> >> broker side timeout but no client side timeout for now.
> >> My current proposal only address (3) but not (2).
> >> Honestly I do not have a very clear idea about what should we do with
> >>(2)
> >> right now. But I am with you that we should not expose too many
> >> configurations to users. What I am thinking now to handle (2) is when
> >>user
> >> call send, if we know that a partition is offline, we should throw
> >> exception immediately instead of putting it into accumulator. This would
> >> protect further memory consumption. We might also want to fail all the
> >> batches in the dequeue once we found a partition is offline.  That
> >>said, I
> >> feel timeout might not be quite applicable to (2).
> >> Do you have any suggestion on this?
> >>
> >
> >Right, I didn't actually mean to solve 2 here, but was trying to figure
> >out
> >if a solution to 2 would reduce what we needed to do to address 3. (And
> >depending on how they are implemented, fixing 1 might also address 2). It
> >sounds like you hit hang that I wasn't really expecting. This probably
> >just
> >means the KIP motivation needs to be a bit clearer about what type of
> >situation this addresses. The cause of the hang may also be relevant -- if
> >it was something like a deadlock then that's something that should just be
> >fixed, but if it's something outside our control then a timeout makes a
> >lot
> >more sense.
> >
> >
> >> >
> >> >I know we have a similar setting,
> >>max.in.flights.requests.per.connection,
> >> >exposed publicly (which I just discovered is missing from the new
> >>producer
> >> >configs documentation). But it looks like the new consumer is not
> >>exposing
> >> >that option, using a fixed value instead. I think we should default to
> >> >hiding these implementation values unless there's a strong case for a
> >> >scenario that requires customization.
> >> For producer, max.in.flight.requests.per.connection really matters. If
> >> people do not want to have reorder of messages, they have to use
> >> max.in.flight.requests.per.connection=1. On the other hand, if
> >>throughput
> >> is more of a concern, it could be set to higher. For the new consumer, I
> >> checked the value and I am not sure if the hard coded
> >> max.in.flight.requests.per.connection=100 is the right value. Without
> >>the
> >> response to the previous request, what offsets should be put into the
> >>next
> >> fetch request? It seems to me the value will be one natively regardless
> >>of
> >> the setting unless we are sending fetch request to different partitions,
> >> which does not look like the case.
> >> Anyway, it looks to be a separate issue orthogonal to the request
> >>timeout.
> >>
> >
> >
> >>
> >> >In other words, since the only user-facing change was the addition of
> >>the
> >> >setting, I'm wondering if we can avoid the KIP altogether by just
> >>choosing
> >> >a good default value for the timeout.
> >> The problem is that we have a server side request timeout exposed as a
> >> public configuration. We cannot set the client timeout smaller than that
> >> value, so a hard coded value probably won¹t work here.
> >>
> >
> >That makes sense, although it's worth keeping in mind that even if you use
> >"correct" values, they could still be violated due to, e.g., a  GC pause
> >that causes the broker to process a request after it is supposed to have
> >expired.
> >
> >-Ewen
> >
> >
> >
> >> >
> >> >-Ewen
> >> >
> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
> >><j...@linkedin.com.invalid>
> >> >wrote:
> >> >
> >> >> Hi,
> >> >>
> >> >> I just created a KIP to add a request timeout to NetworkClient for
> >>new
> >> >> Kafka clients.
> >> >>
> >> >>
> >> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+
> >> >>timeout+to+NetworkClient
> >> >>
> >> >> Comments and suggestions are welcome!
> >> >>
> >> >> Thanks.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >Thanks,
> >> >Ewen
> >>
> >>
> >
> >
> >--
> >Thanks,
> >Ewen
>
>


-- 
-- Guozhang

Reply via email to