Guozhang and Jiangjie,
                 Isn’t this work being covered in 
https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review 
the patch there. 
Thanks,
Harsha


On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote:

Thanks for the update Jiangjie,  

I think it is actually NOT expected that hardware disconnection will be  
detected by the selector, but rather will only be revealed upon TCP  
timeout, which could be hours.  

A couple of comments on the wiki:  

1. "For KafkaProducer.close() and KafkaProducer.flush() we need the request  
timeout as implict timeout." I am not very clear what does this mean?  

2. Currently the producer already has a "TIMEOUT_CONFIG" which should  
really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add "  
REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: admittedly  
it will change the config names but will reduce confusions moving forward.  


Guozhang  


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin <j...@linkedin.com.invalid>  
wrote:  

> Checked the code again. It seems that the disconnected channel is not  
> detected by selector as expected.  
>  
> Currently we are depending on the  
> o.a.k.common.network.Selector.disconnected set to see if we need to do  
> something for a disconnected channel.  
> However Selector.disconnected set is only updated when:  
> 1. A write/read/connect to channel failed.  
> 2. A Key is canceled  
> However when a broker is down before it sends back the response, the  
> client seems not be able to detect this failure.  
>  
> I did a simple test below:  
> 1. Run a selector on one machine and an echo server on another machine.  
> Connect a selector to an echo server  
> 2. Send a message to echo server using selector, then let the selector  
> poll() every 10 seconds.  
> 3. After the sever received the message, unplug cable on the echo server.  
> 4. After waiting for 45 min. The selector still did not detected the  
> network failure.  
> Lsof on selector machine shows that the TCP connection is still considered  
> ESTABLISHED.  
>  
> I’m not sure in this case what should we expect from the  
> java.nio.channels.Selector. According to the document, the selector does  
> not verify the status of the associated channel. In my test case it looks  
> even worse that OS did not think of the socket has been disconnected.  
>  
> Anyway. It seems adding the client side request timeout is necessary. I’ve  
> updated the KIP page to clarify the problem we want to solve according to  
> Ewen’s comments.  
>  
> Thanks.  
>  
> Jiangjie (Becket) Qin  
>  
> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:  
>  
> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin <j...@linkedin.com.invalid>  
> >wrote:  
> >  
> >> Hi Ewen, thanks for the comments. Very good points! Please see replies  
> >> inline.  
> >>  
> >>  
> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io>  
> wrote:  
> >>  
> >> >Jiangjie,  
> >> >  
> >> >Great start. I have a couple of comments.  
> >> >  
> >> >Under the motivation section, is it really true that the request will  
> >> >never  
> >> >be completed? Presumably if the broker goes down the connection will be  
> >> >severed, at worst by a TCP timeout, which should clean up the  
> >>connection  
> >> >and any outstanding requests, right? I think the real reason we need a  
> >> >different timeout is that the default TCP timeouts are ridiculously  
> >>long  
> >> >in  
> >> >this context.  
> >> Yes, when broker is completely down the request should be cleared as you  
> >> said. The case we encountered looks like the broker was just not  
> >> responding but TCP connection was still alive though.  
> >>  
> >  
> >Ok, that makes sense.  
> >  
> >  
> >>  
> >> >  
> >> >My second question is about whether this is the right level to tackle  
> >>the  
> >> >issue/what user-facing changes need to be made. A related problem came  
> >>up  
> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where producer  
> >> records  
> >> >get stuck indefinitely because there's no client-side timeout. This KIP  
> >> >wouldn't fix that problem or any problems caused by lack of  
> >>connectivity  
> >> >since this would only apply to in flight requests, which by definition  
> >> >must  
> >> >have been sent on an active connection.  
> >> >  
> >> >I suspect both types of problems probably need to be addressed  
> >>separately  
> >> >by introducing explicit timeouts. However, because the settings  
> >>introduced  
> >> >here are very much about the internal implementations of the clients,  
> >>I'm  
> >> >wondering if this even needs to be a user-facing setting, especially  
> >>if we  
> >> >have to add other timeouts anyway. For example, would a fixed, generous  
> >> >value that's still much shorter than a TCP timeout, say 15s, be good  
> >> >enough? If other timeouts would allow, for example, the clients to  
> >> >properly  
> >> >exit even if requests have not hit their timeout, then what's the  
> >>benefit  
> >> >of being able to configure the request-level timeout?  
> >> That is a very good point. We have three places that we might be able to  
> >> enforce timeout for a message send:  
> >> 1. Before append to accumulator - handled by metadata timeout on per  
> >> message level.  
> >> 2. Batch of messages inside accumulator - no timeout mechanism now.  
> >> 3. Request of batches after messages leave the accumulator - we have a  
> >> broker side timeout but no client side timeout for now.  
> >> My current proposal only address (3) but not (2).  
> >> Honestly I do not have a very clear idea about what should we do with  
> >>(2)  
> >> right now. But I am with you that we should not expose too many  
> >> configurations to users. What I am thinking now to handle (2) is when  
> >>user  
> >> call send, if we know that a partition is offline, we should throw  
> >> exception immediately instead of putting it into accumulator. This would  
> >> protect further memory consumption. We might also want to fail all the  
> >> batches in the dequeue once we found a partition is offline. That  
> >>said, I  
> >> feel timeout might not be quite applicable to (2).  
> >> Do you have any suggestion on this?  
> >>  
> >  
> >Right, I didn't actually mean to solve 2 here, but was trying to figure  
> >out  
> >if a solution to 2 would reduce what we needed to do to address 3. (And  
> >depending on how they are implemented, fixing 1 might also address 2). It  
> >sounds like you hit hang that I wasn't really expecting. This probably  
> >just  
> >means the KIP motivation needs to be a bit clearer about what type of  
> >situation this addresses. The cause of the hang may also be relevant -- if  
> >it was something like a deadlock then that's something that should just be  
> >fixed, but if it's something outside our control then a timeout makes a  
> >lot  
> >more sense.  
> >  
> >  
> >> >  
> >> >I know we have a similar setting,  
> >>max.in.flights.requests.per.connection,  
> >> >exposed publicly (which I just discovered is missing from the new  
> >>producer  
> >> >configs documentation). But it looks like the new consumer is not  
> >>exposing  
> >> >that option, using a fixed value instead. I think we should default to  
> >> >hiding these implementation values unless there's a strong case for a  
> >> >scenario that requires customization.  
> >> For producer, max.in.flight.requests.per.connection really matters. If  
> >> people do not want to have reorder of messages, they have to use  
> >> max.in.flight.requests.per.connection=1. On the other hand, if  
> >>throughput  
> >> is more of a concern, it could be set to higher. For the new consumer, I  
> >> checked the value and I am not sure if the hard coded  
> >> max.in.flight.requests.per.connection=100 is the right value. Without  
> >>the  
> >> response to the previous request, what offsets should be put into the  
> >>next  
> >> fetch request? It seems to me the value will be one natively regardless  
> >>of  
> >> the setting unless we are sending fetch request to different partitions,  
> >> which does not look like the case.  
> >> Anyway, it looks to be a separate issue orthogonal to the request  
> >>timeout.  
> >>  
> >  
> >  
> >>  
> >> >In other words, since the only user-facing change was the addition of  
> >>the  
> >> >setting, I'm wondering if we can avoid the KIP altogether by just  
> >>choosing  
> >> >a good default value for the timeout.  
> >> The problem is that we have a server side request timeout exposed as a  
> >> public configuration. We cannot set the client timeout smaller than that  
> >> value, so a hard coded value probably won¹t work here.  
> >>  
> >  
> >That makes sense, although it's worth keeping in mind that even if you use  
> >"correct" values, they could still be violated due to, e.g., a GC pause  
> >that causes the broker to process a request after it is supposed to have  
> >expired.  
> >  
> >-Ewen  
> >  
> >  
> >  
> >> >  
> >> >-Ewen  
> >> >  
> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin  
> >><j...@linkedin.com.invalid>  
> >> >wrote:  
> >> >  
> >> >> Hi,  
> >> >>  
> >> >> I just created a KIP to add a request timeout to NetworkClient for  
> >>new  
> >> >> Kafka clients.  
> >> >>  
> >> >>  
> >> >>  
> >>  
> >>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+  
> >> >>timeout+to+NetworkClient  
> >> >>  
> >> >> Comments and suggestions are welcome!  
> >> >>  
> >> >> Thanks.  
> >> >>  
> >> >> Jiangjie (Becket) Qin  
> >> >>  
> >> >>  
> >> >  
> >> >  
> >> >--  
> >> >Thanks,  
> >> >Ewen  
> >>  
> >>  
> >  
> >  
> >--  
> >Thanks,  
> >Ewen  
>  
>  


--  
-- Guozhang  

Reply via email to