[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance
[ https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huangyiming updated KAFKA-10903: Description: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} and most of the time,the producerBatch the producerBatch in deque have the sequence, and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){ deque.addLast(batch); } else { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} and if some producerBatch do not have sequence,we just judgment the last sequence is also right,no problem was: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,ju
[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance
[ https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huangyiming updated KAFKA-10903: Description: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){ deque.addLast(batch); } else { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} was: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. }{code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){
[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance
[ https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huangyiming updated KAFKA-10903: Description: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. }{code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){ deque.addLast(batch); } else { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} was: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { ProducerBatch lastBatchInQueue = deque.peekLast(); List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } }{code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){ deque.addLa
[jira] [Updated] (KAFKA-10903) Optimize producerBatch order performance
[ https://issues.apache.org/jira/browse/KAFKA-10903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huangyiming updated KAFKA-10903: Description: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { ProducerBatch lastBatchInQueue = deque.peekLast(); List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } }{code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){ deque.addLast(batch); } else { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} was: if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { ProducerBatch lastBatchInQueue = deque.peekLast(); List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch);// Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); }// At this point, the incoming batch has been queued in the correct place according to its sequence. } }{code} and i think if we can compare the last producerBatch in the deque
[jira] [Created] (KAFKA-10903) Optimize producerBatch order performance
huangyiming created KAFKA-10903: --- Summary: Optimize producerBatch order performance Key: KAFKA-10903 URL: https://issues.apache.org/jira/browse/KAFKA-10903 Project: Kafka Issue Type: Improvement Reporter: huangyiming if we need sort the producerBatch by sequence,now we use the new batch compare with the first batch in deque, and if the first batch in deque is less than the new batch,we will loop the deque and let the new batch insert the right position. like this : {code:java} // code placeholder if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { ProducerBatch lastBatchInQueue = deque.peekLast(); List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch);// Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); }// At this point, the incoming batch has been queued in the correct place according to its sequence. } }{code} and i think if we can compare the last producerBatch in the deque,if the new batch more than the last producerBatch,just add the new Batch to the last in the deque, like this: {code:java} // code placeholder ProducerBatch lastBatchInQueue = deque.peekLast(); if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && lastBatchInQueue.baseSequence() <= batch.baseSequence()){ deque.addLast(batch); } else { List orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. deque.addFirst(batch); // Now we have to re insert the previously queued batches in the right order. for (int i = orderedBatches.size() - 1; i >= 0; --i) { deque.addFirst(orderedBatches.get(i)); } // At this point, the incoming batch has been queued in the correct place according to its sequence. } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10502) Threadlocal may can not set null,because it may create a memory leak
huangyiming created KAFKA-10502: --- Summary: Threadlocal may can not set null,because it may create a memory leak Key: KAFKA-10502 URL: https://issues.apache.org/jira/browse/KAFKA-10502 Project: Kafka Issue Type: Bug Reporter: huangyiming Threadlocal may can not set null,because it may create a memory leak, you can see the link: [https://stackoverflow.com/questions/12424838/threadlocal-remove],so I think weather can invoke thread local.remove instead -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8175) The broker block some minutes may occur expired error message for a period of time
huangyiming created KAFKA-8175: -- Summary: The broker block some minutes may occur expired error message for a period of time Key: KAFKA-8175 URL: https://issues.apache.org/jira/browse/KAFKA-8175 Project: Kafka Issue Type: Improvement Reporter: huangyiming when the broker block some minutes, the producer may occur expired error message for a period of time,that may continued for a period of time. if the broker cluster's ip is 100,101,102,and the controller is the 100,when the 101 block 2minutes(you can use gdb simulation,and attach the pid for 2 minutes,last quit it), if the controller can not find the machine 101 offline in time(for example the controller found 101 offline only 60 seconds later ),and the controller send leaderAndIsr only 60 seconds later,and in the RecordAccumulator's batches may occur much deliveryTimeout. and the topicAndParttion'leader in 101 may occur expired error,and can not send update metadata to another 100 or 102,because the record in 101 can not send,and can not trigger timeout to update the metadata. so i use -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary
[ https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718420#comment-16718420 ] huangyiming edited comment on KAFKA-7707 at 12/12/18 3:19 AM: -- Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally wo don't need this.waiters.peekFirst() any more. {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} was (Author: huangyimingha...@163.com): Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} > Some code is not necessary > -- > > Key: KAFKA-7707 > URL: https://issues.apache.org/jira/browse/KAFKA-7707 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > Attachments: image-2018-12-05-18-01-46-886.png > > > In the trunk branch in > [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174], > i think the code can clean,is not necessary,it will never execute > {code:java} > if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && > !
[jira] [Comment Edited] (KAFKA-7707) Some code is not necessary
[ https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718420#comment-16718420 ] huangyiming edited comment on KAFKA-7707 at 12/12/18 3:16 AM: -- Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} was (Author: huangyimingha...@163.com): Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} > Some code is not necessary > -- > > Key: KAFKA-7707 > URL: https://issues.apache.org/jira/browse/KAFKA-7707 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > Attachments: image-2018-12-05-18-01-46-886.png > > > In the trunk branch in > [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174], > i think the code can clean,is not necessary,it will never execute > {code:java} > if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && > !this.waiters.isEmpty()) > this.waiters.peekF
[jira] [Commented] (KAFKA-7707) Some code is not necessary
[ https://issues.apache.org/jira/browse/KAFKA-7707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718420#comment-16718420 ] huangyiming commented on KAFKA-7707: Hi [~sliebau] ,thank you review, in the lock area,don't have another thread add Condition to the waiters. the waiters only have one condition {code:java} public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); {code} and finally we have remove the Condition: {code:java} } finally { // When this loop was not able to successfully terminate don't loose available memory this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } {code} so i think in the last finally: {code:java} } finally { // signal any additional waiters if there is more memory left // over for them try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } {code} can modify like this: {code:java} } finally { lock.unlock(); } {code} > Some code is not necessary > -- > > Key: KAFKA-7707 > URL: https://issues.apache.org/jira/browse/KAFKA-7707 > Project: Kafka > Issue Type: Improvement >Reporter: huangyiming >Priority: Minor > Attachments: image-2018-12-05-18-01-46-886.png > > > In the trunk branch in > [BufferPool.java|https://github.com/apache/kafka/blob/578205cadd0bf64d671c6c162229c4975081a9d6/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java#L174], > i think the code can clean,is not necessary,it will never execute > {code:java} > if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && > !this.waiters.isEmpty()) > this.waiters.peekFirst().signal(); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7707) Some code is not necessary
huangyiming created KAFKA-7707: -- Summary: Some code is not necessary Key: KAFKA-7707 URL: https://issues.apache.org/jira/browse/KAFKA-7707 Project: Kafka Issue Type: Improvement Reporter: huangyiming Attachments: image-2018-12-05-18-01-46-886.png !image-2018-12-05-18-01-46-886.png! in the trunk branch,i think the code can clean,is not necessary,it will never execute {code:java} if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)