This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1593dfad5ce1b9520c956fefa86fd9e2037302b1 Author: lipenghui <peng...@apache.org> AuthorDate: Thu Jun 11 08:43:54 2020 +0800 Fix hash range conflict issue in Key_Shared with sticky hash range (#7231) Fix hash range conflict issue in Key_Shared with sticky hash range (cherry picked from commit e1c04ef2026c5f6a9a7229a05cedee3ff9dbf7ee) --- ...ashRangeExclusiveStickyKeyConsumerSelector.java | 9 ++- .../client/api/KeySharedSubscriptionTest.java | 64 ++++++++++++++++++++++ .../pulsar/client/api/PulsarClientException.java | 19 +++++++ .../org/apache/pulsar/client/impl/ClientCnx.java | 2 + 4 files changed, 93 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index dc96fbb..0276f42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -106,7 +106,14 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon } if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) { - throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue()); + PulsarApi.KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta(); + for (PulsarApi.IntRange range : keySharedMeta.getHashRangesList()) { + int start = Math.max(intRange.getStart(), range.getStart()); + int end = Math.min(intRange.getEnd(), range.getEnd()); + if (end >= start) { + throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue()); + } + } } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 8c6c033..511f8c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -655,6 +655,70 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { } } + @Test + public void testHashRangeConflict() throws PulsarClientException { + this.conf.setSubscriptionKeySharedEnable(true); + final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString(); + final String sub = "test"; + + Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535)); + Assert.assertTrue(consumer1.isConnected()); + + Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399)); + Assert.assertTrue(consumer2.isConnected()); + + try { + createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535)); + Assert.fail("Should failed with conflict range."); + } catch (PulsarClientException.ConsumerAssignException ignore) { + } + + try { + createFixedHashRangesConsumer(topic, sub, Range.of(1,1)); + Assert.fail("Should failed with conflict range."); + } catch (PulsarClientException.ConsumerAssignException ignore) { + } + + consumer1.close(); + + try { + createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535)); + Assert.fail("Should failed with conflict range."); + } catch (PulsarClientException.ConsumerAssignException ignore) { + } + + try { + createFixedHashRangesConsumer(topic, sub, Range.of(50,100)); + Assert.fail("Should failed with conflict range."); + } catch (PulsarClientException.ConsumerAssignException ignore) { + } + + try { + createFixedHashRangesConsumer(topic, sub, Range.of(399,500)); + Assert.fail("Should failed with conflict range."); + } catch (PulsarClientException.ConsumerAssignException ignore) { + } + + Consumer<String> consumer3 = createFixedHashRangesConsumer(topic, sub, Range.of(400,600)); + Assert.assertTrue(consumer3.isConnected()); + + Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99)); + Assert.assertTrue(consumer4.isConnected()); + + consumer2.close(); + consumer3.close(); + consumer4.close(); + } + + private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException { + return pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subscription) + .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(ranges)) + .subscribe(); + } + private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException { Producer<Integer> producer = null; if (enableBatch) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index b6e327c..6a6e42a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -732,6 +732,20 @@ public class PulsarClientException extends IOException { } } + /** + * Consumer assign exception thrown by Pulsar client. + */ + public static class ConsumerAssignException extends PulsarClientException { + + /** + * Constructs an {@code ConsumerAssignException} with the specified detail message. + * @param msg The detail message. + */ + public ConsumerAssignException(String msg) { + super(msg); + } + } + // wrap an exception to enriching more info messages. public static Throwable wrap(Throwable t, String msg) { msg += "\n" + t.getMessage(); @@ -786,6 +800,8 @@ public class PulsarClientException extends IOException { return new ChecksumException(msg); } else if (t instanceof CryptoException) { return new CryptoException(msg); + } else if (t instanceof ConsumerAssignException) { + return new ConsumerAssignException(msg); } else if (t instanceof PulsarClientException) { return new PulsarClientException(msg); } else if (t instanceof CompletionException) { @@ -867,6 +883,8 @@ public class PulsarClientException extends IOException { return new ChecksumException(msg); } else if (cause instanceof CryptoException) { return new CryptoException(msg); + } else if (cause instanceof ConsumerAssignException) { + return new ConsumerAssignException(msg); } else if (cause instanceof TopicDoesNotExistException) { return new TopicDoesNotExistException(msg); } else { @@ -895,6 +913,7 @@ public class PulsarClientException extends IOException { || t instanceof NotSupportedException || t instanceof ChecksumException || t instanceof CryptoException + || t instanceof ConsumerAssignException || t instanceof ProducerBusyException || t instanceof ConsumerBusyException) { return false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 6dde157..d63cbc1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1001,6 +1001,8 @@ public class ClientCnx extends PulsarHandler { return new PulsarClientException.IncompatibleSchemaException(errorMsg); case TopicNotFound: return new PulsarClientException.TopicDoesNotExistException(errorMsg); + case ConsumerAssignError: + return new PulsarClientException.ConsumerAssignException(errorMsg); case UnknownError: default: return new PulsarClientException(errorMsg);