This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 76c0939c09b8baa25924b52eb2078a6aa1112728 Author: lipenghui <peng...@apache.org> AuthorDate: Thu Jul 9 12:58:50 2020 +0800 Fix ArrayIndexOutOfBoundsException in batch index ack. (#7483) (cherry picked from commit beb9e3be60513bdfbd0e412a68747b97714af1d7) --- .../org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java | 9 +++++++-- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- .../persistent/PersistentDispatcherSingleActiveConsumer.java | 2 +- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java index e41a2909..175f5ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java @@ -53,8 +53,13 @@ public class EntryBatchIndexesAcks { handle.recycle(this); } - public static EntryBatchIndexesAcks get() { - return RECYCLER.get(); + public static EntryBatchIndexesAcks get(int entriesListSize) { + EntryBatchIndexesAcks ebi = RECYCLER.get(); + + if (ebi.indexesAcks.length < entriesListSize) { + ebi.indexesAcks = new Pair[entriesListSize]; + } + return ebi; } private EntryBatchIndexesAcks(Recycler.Handle<EntryBatchIndexesAcks> handle) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index b8255ba..c24e762 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -525,7 +525,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size()); - EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size()); filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 5bb7629..58a63bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -237,7 +237,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp } else { EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); - EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size()); filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); int totalMessages = sendMessageInfo.getTotalMessages(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 420552c..35d645a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -188,7 +188,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC); - EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC); filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),