codelipenghui commented on a change in pull request #5571: Add epoch for connection handler to handle create producer timeout. URL: https://github.com/apache/pulsar/pull/5571#discussion_r344017172
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java ########## @@ -258,16 +258,59 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { @Override public void resetPublishCountAndEnableReadIfRequired() { if (this.publishRateLimiter.resetPublishCount()) { - enableProduerRead(); + enableProducerRead(); } } /** * it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling */ - protected void enableProduerRead() { + protected void enableProducerRead() { if (producers != null) { - producers.forEach(producer -> producer.getCnx().enableCnxAutoRead()); + producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead()); + } + } + + protected void checkTopicFenced() throws BrokerServiceException { + if (isFenced) { + log.warn("[{}] Attempting to add producer to a fenced topic", topic); + throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"); + } + } + + protected void internalAddProducer(Producer producer) throws BrokerServiceException { + if (isProducersExceeded()) { + log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); + throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); + } + + if (log.isDebugEnabled()) { + log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName()); + } + + Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer); + if (existProducer != null) { + tryOverwriteOldProducer(existProducer, producer); + } + } + + private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) + throws BrokerServiceException { + boolean canOverwrite = false; + if (oldProducer.equals(newProducer) && !oldProducer.isUserProvidedProducerName() + && !newProducer.isUserProvidedProducerName() && newProducer.getEpoch() > oldProducer.getEpoch()) { + oldProducer.close(); + canOverwrite = true; + } + if (canOverwrite) { Review comment: May be if can simplified with ``` if (!canOverwrite || !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { throw new BrokerServiceException.NamingException( "Producer with name '" + newProducer.getProducerName() + "' is already connected") } ``` Right? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services