This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1593dfad5ce1b9520c956fefa86fd9e2037302b1
Author: lipenghui <peng...@apache.org>
AuthorDate: Thu Jun 11 08:43:54 2020 +0800

    Fix hash range conflict issue in Key_Shared with sticky hash range (#7231)
    
    Fix hash range conflict issue in Key_Shared with sticky hash range
    
    (cherry picked from commit e1c04ef2026c5f6a9a7229a05cedee3ff9dbf7ee)
---
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |  9 ++-
 .../client/api/KeySharedSubscriptionTest.java      | 64 ++++++++++++++++++++++
 .../pulsar/client/api/PulsarClientException.java   | 19 +++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 +
 4 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index dc96fbb..0276f42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -106,7 +106,14 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
             }
 
             if (ceilingEntry != null && floorEntry != null && 
ceilingEntry.getValue().equals(floorEntry.getValue())) {
-                throw new 
BrokerServiceException.ConsumerAssignException("Range conflict with consumer " 
+ ceilingEntry.getValue());
+                PulsarApi.KeySharedMeta keySharedMeta = 
ceilingEntry.getValue().getKeySharedMeta();
+                for (PulsarApi.IntRange range : 
keySharedMeta.getHashRangesList()) {
+                    int start = Math.max(intRange.getStart(), 
range.getStart());
+                    int end = Math.min(intRange.getEnd(), range.getEnd());
+                    if (end >= start) {
+                        throw new 
BrokerServiceException.ConsumerAssignException("Range conflict with consumer " 
+ ceilingEntry.getValue());
+                    }
+                }
             }
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 8c6c033..511f8c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -655,6 +655,70 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testHashRangeConflict() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        final String topic = "testHashRangeConflict-" + 
UUID.randomUUID().toString();
+        final String sub = "test";
+
+        Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, 
Range.of(0,99), Range.of(400, 65535));
+        Assert.assertTrue(consumer1.isConnected());
+
+        Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, 
Range.of(100,399));
+        Assert.assertTrue(consumer2.isConnected());
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(1,1));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        consumer1.close();
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(50,100));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(399,500));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        Consumer<String> consumer3 = createFixedHashRangesConsumer(topic, sub, 
Range.of(400,600));
+        Assert.assertTrue(consumer3.isConnected());
+
+        Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, 
Range.of(50,99));
+        Assert.assertTrue(consumer4.isConnected());
+
+        consumer2.close();
+        consumer3.close();
+        consumer4.close();
+    }
+
+    private Consumer<String> createFixedHashRangesConsumer(String topic, 
String subscription, Range... ranges) throws PulsarClientException {
+        return pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subscription)
+                
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(ranges))
+                .subscribe();
+    }
+
     private Producer<Integer> createProducer(String topic, boolean 
enableBatch) throws PulsarClientException {
         Producer<Integer> producer = null;
         if (enableBatch) {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b6e327c..6a6e42a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -732,6 +732,20 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    /**
+     * Consumer assign exception thrown by Pulsar client.
+     */
+    public static class ConsumerAssignException extends PulsarClientException {
+
+        /**
+         * Constructs an {@code ConsumerAssignException} with the specified 
detail message.
+         * @param msg The detail message.
+         */
+        public ConsumerAssignException(String msg) {
+            super(msg);
+        }
+    }
+
     // wrap an exception to enriching more info messages.
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
@@ -786,6 +800,8 @@ public class PulsarClientException extends IOException {
             return new ChecksumException(msg);
         } else if (t instanceof CryptoException) {
             return new CryptoException(msg);
+        } else if (t instanceof ConsumerAssignException) {
+            return new ConsumerAssignException(msg);
         } else if (t instanceof PulsarClientException) {
             return new PulsarClientException(msg);
         } else if (t instanceof CompletionException) {
@@ -867,6 +883,8 @@ public class PulsarClientException extends IOException {
             return new ChecksumException(msg);
         } else if (cause instanceof CryptoException) {
             return new CryptoException(msg);
+        } else if (cause instanceof ConsumerAssignException) {
+            return new ConsumerAssignException(msg);
         } else if (cause instanceof TopicDoesNotExistException) {
             return new TopicDoesNotExistException(msg);
         } else {
@@ -895,6 +913,7 @@ public class PulsarClientException extends IOException {
                 || t instanceof NotSupportedException
                 || t instanceof ChecksumException
                 || t instanceof CryptoException
+                || t instanceof ConsumerAssignException
                 || t instanceof ProducerBusyException
                 || t instanceof ConsumerBusyException) {
             return false;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6dde157..d63cbc1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1001,6 +1001,8 @@ public class ClientCnx extends PulsarHandler {
             return new 
PulsarClientException.IncompatibleSchemaException(errorMsg);
         case TopicNotFound:
             return new 
PulsarClientException.TopicDoesNotExistException(errorMsg);
+        case ConsumerAssignError:
+            return new PulsarClientException.ConsumerAssignException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);

Reply via email to