Re: Review Request 34965: Patch for KAFKA-2241

2015-07-13 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
---

(Updated July 13, 2015, 9:52 p.m.)


Review request for kafka.


Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241


Repository: kafka


Description
---

KAFKA-2241; AbstractFetcherThread.shutdown() should not block


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 

Diff: https://reviews.apache.org/r/34965/diff/


Testing
---


Thanks,

Dong Lin



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-13 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
---

(Updated July 13, 2015, 8:30 p.m.)


Review request for kafka.


Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241


Repository: kafka


Description (updated)
---

KAFKA-2241; AbstractFetcherThread.shutdown() should not block


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 

Diff: https://reviews.apache.org/r/34965/diff/


Testing
---


Thanks,

Dong Lin



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-10 Thread Joel Koshy


> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > 
> >
> > You could get around the above by retaining this call to 
> > simpleConsumer.close (although it would be mostly redundant). However this 
> > is still not ideal, since it is a caveat that the user of the (public) 
> > forceClose API needs to be aware of.
> 
> Dong Lin wrote:
> I agree. I have updated the code and comments to hopefully avoid any 
> confusion to user.
> 
> Joel Koshy wrote:
> Would it work to just modify what you had before in `forceClose` to:
> ```
> disconnect();
> close();
> ```
> 
> Dong Lin wrote:
> I think that won't work. The event sequence you described will still 
> cause problem.
> 
> The following sequence of events may happen:
> 
> - the forceClose() as well as close() is executed by thread 1
> - thread 2 calls sendRequest(). blockingChannel.send(request) will throw 
> ClosedChannelException which triggers reconnect().
> 
> It is possible to make this work by changing the way sendRequest() 
> handles ClosedChannelException. But I find the API in the second patch is 
> better.
> 
> Which solution do you prefer?

True - that won't work. Another option may be to change `connect` to throw a 
`afkaException` if `isClosed` is true. Your latest patch may be better though 
since that avoids modification of the existing API (and only adds to it) - 
although I think naming it `interruptConsumer` may be better. The javadoc can 
clearly state that it actually disconnects the consumer due to the JVM bug (and 
link to the stackoverflow question).


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated July 9, 2015, 10:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> c16f7edd322709060e54c77eb505c44cbd77a4ec 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Dong Lin


> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > 
> >
> > You could get around the above by retaining this call to 
> > simpleConsumer.close (although it would be mostly redundant). However this 
> > is still not ideal, since it is a caveat that the user of the (public) 
> > forceClose API needs to be aware of.
> 
> Dong Lin wrote:
> I agree. I have updated the code and comments to hopefully avoid any 
> confusion to user.
> 
> Joel Koshy wrote:
> Would it work to just modify what you had before in `forceClose` to:
> ```
> disconnect();
> close();
> ```

I think that won't work. The event sequence you described will still cause 
problem.

The following sequence of events may happen:

- the forceClose() as well as close() is executed by thread 1
- thread 2 calls sendRequest(). blockingChannel.send(request) will throw 
ClosedChannelException which triggers reconnect().

It is possible to make this work by changing the way sendRequest() handles 
ClosedChannelException. But I find the API in the second patch is better.

Which solution do you prefer?


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated July 9, 2015, 10:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> c16f7edd322709060e54c77eb505c44cbd77a4ec 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Joel Koshy


> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > 
> >
> > You could get around the above by retaining this call to 
> > simpleConsumer.close (although it would be mostly redundant). However this 
> > is still not ideal, since it is a caveat that the user of the (public) 
> > forceClose API needs to be aware of.
> 
> Dong Lin wrote:
> I agree. I have updated the code and comments to hopefully avoid any 
> confusion to user.

Would it work to just modify what you had before in `forceClose` to:
```
disconnect();
close();
```


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated July 9, 2015, 10:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> c16f7edd322709060e54c77eb505c44cbd77a4ec 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Dong Lin


> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 71
> > 
> >
> > As you probably noticed synchronization in the 
> > AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. 
> > Since the simple consumer is force-closed without the SimpleConsumer’s lock 
> > consider the following sequence:
> > - You call forceClose
> > - In the mean time (before isClosed is set to true) an ongoing call to 
> > sendRequest recreates the connection
> > - The fetcher thread will subsequently exit (since the 
> > ShutdownableThread’s isRunning flag is false)
> > - So even though the SimpleConsumer is _closed_ at that point, the 
> > connection will remain
> > 
> > Can you verify or is it a non-issue?

Thanks for the catch! Yes this is an issue. After looking through the code 
carefully I think we have to keep the simpleConsumer.close() to avoid this 
problem.

I have also changed the function name and added comments to document the use of 
the new API.


> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > 
> >
> > You could get around the above by retaining this call to 
> > simpleConsumer.close (although it would be mostly redundant). However this 
> > is still not ideal, since it is a caveat that the user of the (public) 
> > forceClose API needs to be aware of.

I agree. I have updated the code and comments to hopefully avoid any confusion 
to user.


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated July 9, 2015, 10:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> c16f7edd322709060e54c77eb505c44cbd77a4ec 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
---

(Updated July 9, 2015, 10:35 p.m.)


Review request for kafka.


Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241


Repository: kafka


Description
---

KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
ReadableByteChannel.read(buffer)


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 

Diff: https://reviews.apache.org/r/34965/diff/


Testing
---


Thanks,

Dong Lin



Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---



core/src/main/scala/kafka/server/AbstractFetcherThread.scala (line 71)


As you probably noticed synchronization in the 
AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. 
Since the simple consumer is force-closed without the SimpleConsumer’s lock 
consider the following sequence:
- You call forceClose
- In the mean time (before isClosed is set to true) an ongoing call to 
sendRequest recreates the connection
- The fetcher thread will subsequently exit (since the ShutdownableThread’s 
isRunning flag is false)
- So even though the SimpleConsumer is _closed_ at that point, the 
connection will remain

Can you verify or is it a non-issue?



core/src/main/scala/kafka/server/AbstractFetcherThread.scala 


You could get around the above by retaining this call to 
simpleConsumer.close (although it would be mostly redundant). However this is 
still not ideal, since it is a caveat that the user of the (public) forceClose 
API needs to be aware of.


- Joel Koshy


On June 3, 2015, 10:30 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated June 3, 2015, 10:30 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 31a2639477bf66f9a05d2b9b07794572d7ec393b 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Re: Review Request 34965: Patch for KAFKA-2241

2015-06-03 Thread Dong Lin


> On June 3, 2015, 6:53 p.m., Aditya Auradkar wrote:
> > core/src/main/scala/kafka/consumer/SimpleConsumer.scala, line 61
> > 
> >
> > Why not simply modify the close method to disconnect outside the 
> > synchronized block? Not that I feel very strongly, I'm curious.

Sure. I will think about this after finishing other works.


> On June 3, 2015, 6:53 p.m., Aditya Auradkar wrote:
> > core/src/main/scala/kafka/consumer/SimpleConsumer.scala, line 63
> > 
> >
> > I think this needs to be volatile or AtomicBoolean

You are right. Fixed.

Thanks.


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review86460
---


On June 3, 2015, 10:30 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated June 3, 2015, 10:30 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 31a2639477bf66f9a05d2b9b07794572d7ec393b 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Re: Review Request 34965: Patch for KAFKA-2241

2015-06-03 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
---

(Updated June 3, 2015, 10:30 p.m.)


Review request for kafka.


Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241


Repository: kafka


Description
---

KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
ReadableByteChannel.read(buffer)


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
31a2639477bf66f9a05d2b9b07794572d7ec393b 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 

Diff: https://reviews.apache.org/r/34965/diff/


Testing
---


Thanks,

Dong Lin



Re: Review Request 34965: Patch for KAFKA-2241

2015-06-03 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review86460
---


This may be hard to do in a unit test, but can you check if it's feasible to 
write a test case?


core/src/main/scala/kafka/consumer/SimpleConsumer.scala


Why not simply modify the close method to disconnect outside the 
synchronized block? Not that I feel very strongly, I'm curious.



core/src/main/scala/kafka/consumer/SimpleConsumer.scala


I think this needs to be volatile or AtomicBoolean


- Aditya Auradkar


On June 2, 2015, 11:54 p.m., Dong Lin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> ---
> 
> (Updated June 2, 2015, 11:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
> ReadableByteChannel.read(buffer)
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 31a2639477bf66f9a05d2b9b07794572d7ec393b 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
> 
> Diff: https://reviews.apache.org/r/34965/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dong Lin
> 
>



Review Request 34965: Patch for KAFKA-2241

2015-06-02 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
---

Review request for kafka.


Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241


Repository: kafka


Description
---

KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
ReadableByteChannel.read(buffer)


Diffs
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
31a2639477bf66f9a05d2b9b07794572d7ec393b 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 

Diff: https://reviews.apache.org/r/34965/diff/


Testing
---


Thanks,

Dong Lin