[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance

2021-01-05 Thread huangyiming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangyiming updated KAFKA-10903:

Description: 
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}
and  most of the time,the producerBatch the producerBatch in deque have the 
sequence, and i think    if we can compare the last producerBatch in the 
deque,if the new batch more than the last producerBatch,just add the new Batch 
to the last in the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}
and if some producerBatch do not have sequence,we just judgment the last 
sequence is also right,no problem

  was:
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,ju

[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance

2021-01-05 Thread huangyiming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangyiming updated KAFKA-10903:

Description: 
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}

  was:
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) { List 
orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence()) orderedBatches.add(deque.pollFirst()); 
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " + "position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size()); // Either we have reached a point 
where there are batches without a sequence (ie. never been drained // and are 
hence in order by default), or the batch at the front of the queue has a 
sequence greater // than the incoming batch. This is the right place to add the 
incoming batch. deque.addFirst(batch); // Now we have to re insert the 
previously queued batches in the right order. for (int i = 
orderedBatches.size() - 1; i >= 0; --i) { 
deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch 
has been queued in the correct place according to its sequence. }{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){

[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance

2021-01-05 Thread huangyiming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangyiming updated KAFKA-10903:

Description: 
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) { List 
orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence()) orderedBatches.add(deque.pollFirst()); 
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " + "position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size()); // Either we have reached a point 
where there are batches without a sequence (ie. never been drained // and are 
hence in order by default), or the batch at the front of the queue has a 
sequence greater // than the incoming batch. This is the right place to add the 
incoming batch. deque.addFirst(batch); // Now we have to re insert the 
previously queued batches in the right order. for (int i = 
orderedBatches.size() - 1; i >= 0; --i) { 
deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch 
has been queued in the correct place according to its sequence. }{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}

  was:
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) { ProducerBatch 
lastBatchInQueue = deque.peekLast(); List orderedBatches = new 
ArrayList<>(); while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence()) orderedBatches.add(deque.pollFirst()); 
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " + "position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size()); // Either we have reached a point 
where there are batches without a sequence (ie. never been drained // and are 
hence in order by default), or the batch at the front of the queue has a 
sequence greater // than the incoming batch. This is the right place to add the 
incoming batch. deque.addFirst(batch); // Now we have to re insert the 
previously queued batches in the right order. for (int i = 
orderedBatches.size() - 1; i >= 0; --i) { 
deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch 
has been queued in the correct place according to its sequence. } }{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLa

[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance

2021-01-05 Thread huangyiming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huangyiming updated KAFKA-10903:

Description: 
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) { ProducerBatch 
lastBatchInQueue = deque.peekLast(); List orderedBatches = new 
ArrayList<>(); while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence()) orderedBatches.add(deque.pollFirst()); 
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " + "position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size()); // Either we have reached a point 
where there are batches without a sequence (ie. never been drained // and are 
hence in order by default), or the batch at the front of the queue has a 
sequence greater // than the incoming batch. This is the right place to add the 
incoming batch. deque.addFirst(batch); // Now we have to re insert the 
previously queued batches in the right order. for (int i = 
orderedBatches.size() - 1; i >= 0; --i) { 
deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch 
has been queued in the correct place according to its sequence. } }{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}

  was:
if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
   
ProducerBatch lastBatchInQueue = deque.peekLast();

List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " +
"position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size());
// Either we have reached a point where there are batches 
without a sequence (ie. never been drained
// and are hence in order by default), or the batch at the 
front of the queue has a sequence greater
// than the incoming batch. This is the right place to add the 
incoming batch.
deque.addFirst(batch);// Now we have to re 
insert the previously queued batches in the right order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}// At this point, the incoming batch has been 
queued in the correct place according to its sequence.
}
}{code}
and i think    if we can compare the last producerBatch in the deque

[jira] [Created] (KAFKA-10903) Optimize producerBatch order performance

2021-01-05 Thread huangyiming (Jira)
huangyiming created KAFKA-10903:
---

 Summary: Optimize producerBatch order performance
 Key: KAFKA-10903
 URL: https://issues.apache.org/jira/browse/KAFKA-10903
 Project: Kafka
  Issue Type: Improvement
Reporter: huangyiming


if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
   
ProducerBatch lastBatchInQueue = deque.peekLast();

List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " +
"position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size());
// Either we have reached a point where there are batches 
without a sequence (ie. never been drained
// and are hence in order by default), or the batch at the 
front of the queue has a sequence greater
// than the incoming batch. This is the right place to add the 
incoming batch.
deque.addFirst(batch);// Now we have to re 
insert the previously queued batches in the right order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}// At this point, the incoming batch has been 
queued in the correct place according to its sequence.
}
}{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
deque.addLast(batch);
} else {
List orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());

log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
// Either we have reached a point where there are batches without a 
sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming 
batch.
deque.addFirst(batch);

// Now we have to re insert the previously queued batches in the right 
order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}

// At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10502) Threadlocal may can not set null,because it may create a memory leak

2020-09-18 Thread huangyiming (Jira)
huangyiming created KAFKA-10502:
---

 Summary: Threadlocal  may can not set null,because it may create a 
memory leak
 Key: KAFKA-10502
 URL: https://issues.apache.org/jira/browse/KAFKA-10502
 Project: Kafka
  Issue Type: Bug
Reporter: huangyiming


Threadlocal  may can not set null,because it may create a memory leak, you can 
see the link:
[https://stackoverflow.com/questions/12424838/threadlocal-remove],so I think 
weather can invoke thread local.remove instead 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8175) The broker block some minutes may occur expired error message for a period of time

2019-03-29 Thread huangyiming (JIRA)
huangyiming created KAFKA-8175:
--

 Summary: The broker block some minutes may occur expired error 
message for a period of time  
 Key: KAFKA-8175
 URL: https://issues.apache.org/jira/browse/KAFKA-8175
 Project: Kafka
  Issue Type: Improvement
Reporter: huangyiming


when the broker block some minutes, the producer may occur expired error 
message for a period of time,that may continued for a period of time. if the 
broker cluster's ip is 100,101,102,and the controller is the 100,when the 101 
block 2minutes(you can use gdb simulation,and attach the pid for 2 minutes,last 
quit it),  if the controller can not find the machine 101 offline in time(for 
example the controller found 101 offline  only 60 seconds later ),and the 
controller send leaderAndIsr only 60 seconds later,and in the

RecordAccumulator's batches may occur much   deliveryTimeout. and the 
topicAndParttion'leader in 101 may occur expired error,and can not send update 
metadata to another 100 or 102,because the record in 101 can not send,and can 
not trigger timeout to update the metadata.

so i use 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary

2018-12-11 Thread huangyiming (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718420#comment-16718420
 ] 

huangyiming edited comment on KAFKA-7707 at 12/12/18 3:19 AM:
--

Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally wo don't need this.waiters.peekFirst() any more.
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}


was (Author: huangyimingha...@163.com):
Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}

> Some code is not necessary
> --
>
> Key: KAFKA-7707
> URL: https://issues.apache.org/jira/browse/KAFKA-7707
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
> Attachments: image-2018-12-05-18-01-46-886.png
>
>
> In the trunk branch in 
> [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174],
>  i think the code can clean,is not necessary,it will never execute
> {code:java}
> if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
> !

[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary

2018-12-11 Thread huangyiming (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718420#comment-16718420
 ] 

huangyiming edited comment on KAFKA-7707 at 12/12/18 3:16 AM:
--

Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}


was (Author: huangyimingha...@163.com):
Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}

> Some code is not necessary
> --
>
> Key: KAFKA-7707
> URL: https://issues.apache.org/jira/browse/KAFKA-7707
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
> Attachments: image-2018-12-05-18-01-46-886.png
>
>
> In the trunk branch in 
> [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174],
>  i think the code can clean,is not necessary,it will never execute
> {code:java}
> if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
> !this.waiters.isEmpty())
> this.waiters.peekF

[jira] [Commented] (KAFKA-7707) Some code is not necessary

2018-12-11 Thread huangyiming (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718420#comment-16718420
 ] 

huangyiming commented on KAFKA-7707:


Hi  [~sliebau] ,thank you review, in the lock area,don't have another thread  
add Condition to the waiters. the waiters only  have one condition
{code:java}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws 
InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

{code}
and finally we have remove the Condition:
{code:java}
} finally { // When this loop was not able to successfully terminate don't 
loose available memory this.nonPooledAvailableMemory += accumulated; 
this.waiters.remove(moreMemory); }
{code}
so i think  in the last finally:
{code:java}
} finally { // signal any additional waiters if there is more memory left // 
over for them try { if (!(this.nonPooledAvailableMemory == 0 && 
this.free.isEmpty()) && !this.waiters.isEmpty()) 
this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise 
find bugs complains lock.unlock(); } }
{code}
can modify like this:
{code:java}
} finally {  lock.unlock();  }
{code}

> Some code is not necessary
> --
>
> Key: KAFKA-7707
> URL: https://issues.apache.org/jira/browse/KAFKA-7707
> Project: Kafka
>  Issue Type: Improvement
>Reporter: huangyiming
>Priority: Minor
> Attachments: image-2018-12-05-18-01-46-886.png
>
>
> In the trunk branch in 
> [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174],
>  i think the code can clean,is not necessary,it will never execute
> {code:java}
> if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
> !this.waiters.isEmpty())
> this.waiters.peekFirst().signal();
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7707) Some code is not necessary

2018-12-05 Thread huangyiming (JIRA)
huangyiming created KAFKA-7707:
--

 Summary: Some code is not necessary
 Key: KAFKA-7707
 URL: https://issues.apache.org/jira/browse/KAFKA-7707
 Project: Kafka
  Issue Type: Improvement
Reporter: huangyiming
 Attachments: image-2018-12-05-18-01-46-886.png

!image-2018-12-05-18-01-46-886.png!

in the trunk branch,i think the code can clean,is not necessary,it will never 
execute
{code:java}
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && 
!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)