Checked the code again. It seems that the disconnected channel is not
detected by selector as expected.

Currently we are depending on the
o.a.k.common.network.Selector.disconnected set to see if we need to do
something for a disconnected channel.
However Selector.disconnected set is only updated when:
1. A write/read/connect to channel failed.
2. A Key is canceled
However when a broker is down before it sends back the response, the
client seems not be able to detect this failure.

I did a simple test below:
1. Run a selector on one machine and an echo server on another machine.
Connect a selector to an echo server
2. Send a message to echo server using selector, then let the selector
poll() every 10 seconds.
3. After the sever received the message, unplug cable on the echo server.
4. After waiting for 45 min. The selector still did not detected the
network failure.
Lsof on selector machine shows that the TCP connection is still considered
ESTABLISHED.

I’m not sure in this case what should we expect from the
java.nio.channels.Selector. According to the document, the selector does
not verify the status of the associated channel. In my test case it looks
even worse that OS did not think of the socket has been disconnected.

Anyway. It seems adding the client side request timeout is necessary. I’ve
updated the KIP page to clarify the problem we want to solve according to
Ewen’s comments.

Thanks.

Jiangjie (Becket) Qin

On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:

>On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>> Hi Ewen, thanks for the comments. Very good points! Please see replies
>> inline.
>>
>>
>> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:
>>
>> >Jiangjie,
>> >
>> >Great start. I have a couple of comments.
>> >
>> >Under the motivation section, is it really true that the request will
>> >never
>> >be completed? Presumably if the broker goes down the connection will be
>> >severed, at worst by a TCP timeout, which should clean up the
>>connection
>> >and any outstanding requests, right? I think the real reason we need a
>> >different timeout is that the default TCP timeouts are ridiculously
>>long
>> >in
>> >this context.
>> Yes, when broker is completely down the request should be cleared as you
>> said. The case we encountered looks like the broker was just not
>> responding but TCP connection was still alive though.
>>
>
>Ok, that makes sense.
>
>
>>
>> >
>> >My second question is about whether this is the right level to tackle
>>the
>> >issue/what user-facing changes need to be made. A related problem came
>>up
>> >in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
>> records
>> >get stuck indefinitely because there's no client-side timeout. This KIP
>> >wouldn't fix that problem or any problems caused by lack of
>>connectivity
>> >since this would only apply to in flight requests, which by definition
>> >must
>> >have been sent on an active connection.
>> >
>> >I suspect both types of problems probably need to be addressed
>>separately
>> >by introducing explicit timeouts. However, because the settings
>>introduced
>> >here are very much about the internal implementations of the clients,
>>I'm
>> >wondering if this even needs to be a user-facing setting, especially
>>if we
>> >have to add other timeouts anyway. For example, would a fixed, generous
>> >value that's still much shorter than a TCP timeout, say 15s, be good
>> >enough? If other timeouts would allow, for example, the clients to
>> >properly
>> >exit even if requests have not hit their timeout, then what's the
>>benefit
>> >of being able to configure the request-level timeout?
>> That is a very good point. We have three places that we might be able to
>> enforce timeout for a message send:
>> 1. Before append to accumulator - handled by metadata timeout on per
>> message level.
>> 2. Batch of messages inside accumulator - no timeout mechanism now.
>> 3. Request of batches after messages leave the accumulator - we have a
>> broker side timeout but no client side timeout for now.
>> My current proposal only address (3) but not (2).
>> Honestly I do not have a very clear idea about what should we do with
>>(2)
>> right now. But I am with you that we should not expose too many
>> configurations to users. What I am thinking now to handle (2) is when
>>user
>> call send, if we know that a partition is offline, we should throw
>> exception immediately instead of putting it into accumulator. This would
>> protect further memory consumption. We might also want to fail all the
>> batches in the dequeue once we found a partition is offline.  That
>>said, I
>> feel timeout might not be quite applicable to (2).
>> Do you have any suggestion on this?
>>
>
>Right, I didn't actually mean to solve 2 here, but was trying to figure
>out
>if a solution to 2 would reduce what we needed to do to address 3. (And
>depending on how they are implemented, fixing 1 might also address 2). It
>sounds like you hit hang that I wasn't really expecting. This probably
>just
>means the KIP motivation needs to be a bit clearer about what type of
>situation this addresses. The cause of the hang may also be relevant -- if
>it was something like a deadlock then that's something that should just be
>fixed, but if it's something outside our control then a timeout makes a
>lot
>more sense.
>
>
>> >
>> >I know we have a similar setting,
>>max.in.flights.requests.per.connection,
>> >exposed publicly (which I just discovered is missing from the new
>>producer
>> >configs documentation). But it looks like the new consumer is not
>>exposing
>> >that option, using a fixed value instead. I think we should default to
>> >hiding these implementation values unless there's a strong case for a
>> >scenario that requires customization.
>> For producer, max.in.flight.requests.per.connection really matters. If
>> people do not want to have reorder of messages, they have to use
>> max.in.flight.requests.per.connection=1. On the other hand, if
>>throughput
>> is more of a concern, it could be set to higher. For the new consumer, I
>> checked the value and I am not sure if the hard coded
>> max.in.flight.requests.per.connection=100 is the right value. Without
>>the
>> response to the previous request, what offsets should be put into the
>>next
>> fetch request? It seems to me the value will be one natively regardless
>>of
>> the setting unless we are sending fetch request to different partitions,
>> which does not look like the case.
>> Anyway, it looks to be a separate issue orthogonal to the request
>>timeout.
>>
>
>
>>
>> >In other words, since the only user-facing change was the addition of
>>the
>> >setting, I'm wondering if we can avoid the KIP altogether by just
>>choosing
>> >a good default value for the timeout.
>> The problem is that we have a server side request timeout exposed as a
>> public configuration. We cannot set the client timeout smaller than that
>> value, so a hard coded value probably won¹t work here.
>>
>
>That makes sense, although it's worth keeping in mind that even if you use
>"correct" values, they could still be violated due to, e.g., a  GC pause
>that causes the broker to process a request after it is supposed to have
>expired.
>
>-Ewen
>
>
>
>> >
>> >-Ewen
>> >
>> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
>><j...@linkedin.com.invalid>
>> >wrote:
>> >
>> >> Hi,
>> >>
>> >> I just created a KIP to add a request timeout to NetworkClient for
>>new
>> >> Kafka clients.
>> >>
>> >>
>> >>
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+
>> >>timeout+to+NetworkClient
>> >>
>> >> Comments and suggestions are welcome!
>> >>
>> >> Thanks.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >>
>> >
>> >
>> >--
>> >Thanks,
>> >Ewen
>>
>>
>
>
>-- 
>Thanks,
>Ewen

Reply via email to