codelipenghui commented on a change in pull request #5571: Add epoch for 
connection handler to handle create producer timeout.
URL: https://github.com/apache/pulsar/pull/5571#discussion_r344017172
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 ##########
 @@ -258,16 +258,59 @@ public void incrementPublishCount(int numOfMessages, 
long msgSizeInBytes) {
      @Override
     public void resetPublishCountAndEnableReadIfRequired() {
         if (this.publishRateLimiter.resetPublishCount()) {
-            enableProduerRead();
+            enableProducerRead();
         }
     }
 
      /**
      * it sets cnx auto-readable if producer's cnx is disabled due to 
publish-throttling
      */
-    protected void enableProduerRead() {
+    protected void enableProducerRead() {
         if (producers != null) {
-            producers.forEach(producer -> 
producer.getCnx().enableCnxAutoRead());
+            producers.values().forEach(producer -> 
producer.getCnx().enableCnxAutoRead());
+        }
+    }
+
+    protected void checkTopicFenced() throws BrokerServiceException {
+        if (isFenced) {
+            log.warn("[{}] Attempting to add producer to a fenced topic", 
topic);
+            throw new BrokerServiceException.TopicFencedException("Topic is 
temporarily unavailable");
+        }
+    }
+
+    protected void internalAddProducer(Producer producer) throws 
BrokerServiceException {
+        if (isProducersExceeded()) {
+            log.warn("[{}] Attempting to add producer to topic which reached 
max producers limit", topic);
+            throw new BrokerServiceException.ProducerBusyException("Topic 
reached max producers limit");
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] {} Got request to create producer ", topic, 
producer.getProducerName());
+        }
+
+        Producer existProducer = 
producers.putIfAbsent(producer.getProducerName(), producer);
+        if (existProducer != null) {
+            tryOverwriteOldProducer(existProducer, producer);
+        }
+    }
+
+    private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
+            throws BrokerServiceException {
+        boolean canOverwrite = false;
+        if (oldProducer.equals(newProducer) && 
!oldProducer.isUserProvidedProducerName()
+                && !newProducer.isUserProvidedProducerName() && 
newProducer.getEpoch() > oldProducer.getEpoch()) {
+            oldProducer.close();
+            canOverwrite = true;
+        }
+        if (canOverwrite) {
 
 Review comment:
   May be if can simplified with 
   ```
   if (!canOverwrite || !producers.replace(newProducer.getProducerName(), 
oldProducer, newProducer)) {
       throw new BrokerServiceException.NamingException(
                       "Producer with name '" + newProducer.getProducerName() + 
"' is already connected")
   }
   ```
   Right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to