> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 155
> > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line155>
> >
> >     This scheme is clever but non-obvious, is there a simpler way?
> 
> Jiangjie Qin wrote:
>     I'm not sure if there is a simpler way. Maybe we can review the current 
> approach again and see if we can simplify them.
>     
>     The goals we want to achieve here are:
>     1. When abortImcompleteBatch finishes, no more message should be 
> appended. 
>     2. Make sure when hasUnsent() return false, it does not miss any batch.
>     
>     The current solutions for them both depending on setting close flag first.
>     To achieve (1), the implementation now is setting a close flag first and 
> wait until all on going appends (if any) to finish.
>     To achieve (2), the implementation synchoronizes on the deque. When an 
> append grabs deque lock, it first check if close flag is set or not. If it is 
> set, that means hasUnsent() might have already checked this deque, so it is 
> not safe to append a new batch anymore. Otherwise it is safe to append a new 
> batch.

Thought about this again. And I went back to use the ReaderWriterLock with a 
small modification using tryLock. Hope it makes things cleaner. The idea is 
exactly the same as current approach but the code is less confusing. I put some 
reasoning below, please let me know if you have any suggestions.

Essentially we want to make sure no messages or batches is left behind after 
1. calling abortIncompleteBatches() 
2. hasUnsent() returned false (hasUnsent() only cares about batch, not message)
This means we need to make sure append will not proceed after these two events.

We set close flag before these two events to reject append. And the key issue 
we want to solve here is how to deal with the appending thread. 

To detect if there are appending threads, we need something either a explicit 
flag or an exclusive lock (ReaderWriterLock as in previous patch)
If there are appending going on, we have two options here:
A. Fail the append
B. wait until appends are done.

It is a little bit difficult to fail the append because it is difficult to know 
which step appending is in. hasUnsent() uses dequeue lock to make sure no new 
batch can be added to a dequeue after it has been checked by hasUnsent(). 
That's why we need to check the close flag in append after grabbed dequeue lock 
again.

For abortIncompleteBatches, currently we use option B. One tricky thing here is 
that a thread might block on buffer full when abortIncompleteBatches is 
waiting. This would lead to deadlock if the abortIncompleteBatches is called 
from sender thread as no memory will be released by sender thread and sender 
thread is waiting for the append which is waiting for memeory. This means we 
need to keep sender thread running to release memory. To solve this issue, 
instead of blocking on acquiring writelock, a tryLock is used, and if it 
returns false, we keep fail batches to release memory until we grab the write 
lock successfully. Then we abort batches for the last time.

What do you think about this approach?


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 539
> > <https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line539>
> >
> >     It wouldn't block forever, that isn't correct, it would just block for 
> > the period of time they specified.
> 
> Jiangjie Qin wrote:
>     We are saying we will call close(0) instead of sender thread call 
> close(timeout). And we do this to *avoid* blocking forever.

I removed the comment to avoid confusion.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79813
-----------------------------------------------------------


On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 16, 2015, 6:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
> c2fdc23239bd2196cd912c3d121b591f21393eab 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to