gaoran10 commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1562306732
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ########## @@ -188,58 +270,135 @@ protected CompletableFuture<Boolean> isLocalTopicActive() { }, brokerService.executor()); } - protected synchronized CompletableFuture<Void> closeProducerAsync() { - if (producer == null) { - STATE_UPDATER.set(this, State.Stopped); + /** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ + public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { + long backlog = getNumberOfEntriesInBacklog(); + if (failIfHasBacklog && backlog > 0) { + CompletableFuture<Void> disconnectFuture = new CompletableFuture<>(); + disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); + if (log.isDebugEnabled()) { + log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); + } + return disconnectFuture; + } + log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, + getReplicatorReadPosition(), backlog); + return closeProducerAsync(closeTheStartingProducer); + } + + /** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ + protected CompletableFuture<Void> closeProducerAsync(boolean closeTheStartingProducer) { + Pair<Boolean, State> setDisconnectingRes = compareSetAndGetState(State.Started, State.Disconnecting); + if (!setDisconnectingRes.getLeft()) { + if (setDisconnectingRes.getRight() == State.Starting) { + if (closeTheStartingProducer) { + /** + * Delay retry(wait for the start producer task is finish). + * Note: If the producer always start fail, the start producer task will always retry until the + * state changed to {@link State.Terminated}. + * Nit: The better solution is creating a {@link CompletableFuture} to trace the in-progress + * creation and call "inProgressCreationFuture.thenApply(closeProducer())". + */ + long waitTimeMs = backOff.next(); + brokerService.executor().schedule(() -> closeProducerAsync(true), + waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.info("[{}] Skip current producer closing since the previous producer has been closed," + + " and trying start a new one, state : {}", + replicatorId, setDisconnectingRes.getRight()); + } + } else if (setDisconnectingRes.getRight() == State.Disconnected + || setDisconnectingRes.getRight() == State.Disconnecting) { + log.info("[{}] Skip current producer closing since other thread did closing, state : {}", + replicatorId, setDisconnectingRes.getRight()); + } else if (setDisconnectingRes.getRight() == State.Terminating + || setDisconnectingRes.getRight() == State.Terminated) { + log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", + replicatorId, state); + } + log.info("[{}] Skip current termination since other thread is doing close producer or termination," + + " state : {}", replicatorId, state); return CompletableFuture.completedFuture(null); } - CompletableFuture<Void> future = producer.closeAsync(); + + // Close producer and update state. + return doCloseProducerAsync(producer, () -> { + Pair<Boolean, State> setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { + this.producer = null; + // deactivate further read + disableReplicatorRead(); + return; + } + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectingRes.getRight() == State.Terminated) { + log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", + replicatorId, state); + } else { + // Since only one task can call "doCloseProducerAsync(producer, action)", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other task has change the state to terminated. so skipped current one task." + + " State is : {}", + replicatorId, state); + } + }); + } + + protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> producer, Runnable actionAfterClosed) { + CompletableFuture<Void> future = + producer == null ? CompletableFuture.completedFuture(null) : producer.closeAsync(); return future.thenRun(() -> { - STATE_UPDATER.set(this, State.Stopped); - this.producer = null; - // deactivate further read - disableReplicatorRead(); + actionAfterClosed.run(); }).exceptionally(ex -> { long waitTimeMs = backOff.next(); log.warn( - "[{}] Exception: '{}' occurred while trying to close the producer." - + " retrying again in {} s", - replicatorId, ex.getMessage(), waitTimeMs / 1000.0); + "[{}] Exception: '{}' occurred while trying to close the producer. Replicator state: {}." + + " Retrying again in {} s.", + replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(() -> doCloseProducerAsync(producer, actionAfterClosed), + waitTimeMs, TimeUnit.MILLISECONDS); return null; }); } + public CompletableFuture<Void> terminate() { + if (!tryChangeStatusToTerminating()) { Review Comment: The method `tryChangeStatusToTerminating` checks four states, `Starting`, `Started`, `Disconnecting`, `Disconnected`, maybe we can only check the states `Terminating` and `Terminated` here. Another potential problem is that if the state is `Terminating`, the method `terminate` will return a complete future object, but the terminate operation may not finish. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ########## @@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName()); - STATE_UPDATER.set(this, State.Stopped); + STATE_UPDATER.set(this, State.Disconnected); } protected abstract String getProducerName(); - protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer); + protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> producer); protected abstract Position getReplicatorReadPosition(); - protected abstract long getNumberOfEntriesInBacklog(); + public abstract long getNumberOfEntriesInBacklog(); protected abstract void disableReplicatorRead(); public String getRemoteCluster() { return remoteCluster; } - // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer - // the end result can be disconnect. - public synchronized void startProducer() { - if (STATE_UPDATER.get(this) == State.Stopping) { - long waitTimeMs = backOff.next(); - if (log.isDebugEnabled()) { - log.debug( - "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", - replicatorId, waitTimeMs / 1000.0); - } - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); - return; - } - State state = STATE_UPDATER.get(this); - if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { - if (state == State.Started) { - // Already running + public void startProducer() { + // Guarantee only one task call "producerBuilder.createAsync()". + Pair<Boolean, State> setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); + if (!setStartingRes.getLeft()) { + if (setStartingRes.getRight() == State.Starting) { + log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", + replicatorId, state); + } else if (setStartingRes.getRight() == State.Started) { + // Since the method "startProducer" will be called even if it is started, only print debug-level log. + if (log.isDebugEnabled()) { + log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); + } + } else if (setStartingRes.getRight() == State.Disconnecting) { if (log.isDebugEnabled()) { - log.debug("[{}] Replicator was already running", replicatorId); + log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." + + " state: {}", replicatorId, state); } + delayStartProducerAfterDisconnected(); } else { - log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); + /** {@link State.Terminating}, {@link State.Terminated}. **/ + log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } - return; } log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { - readEntries(producer); + setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + Pair<Boolean, State> setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { - log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectedRes.getRight() == State.Terminated) { + log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", + replicatorId, state); + } else { + /** {@link State.Disconnected}, {@link State.Starting}, {@link State.Started} **/ + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other thread will try to create the producer again. so skipped current one task." + + " State is : {}", + replicatorId, state); + } } return null; }); + } + /*** + * The producer is disconnecting, delay to start the producer. + * If we start a producer immediately, we will get a conflict producer(same name producer) registered error. + */ + protected void delayStartProducerAfterDisconnected() { + long waitTimeMs = backOff.next(); + if (log.isDebugEnabled()) { + log.debug( + "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", + replicatorId, waitTimeMs / 1000.0); + } + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } - protected void checkTopicActiveAndRetryStartProducer() { - isLocalTopicActive().thenAccept(isTopicActive -> { - if (isTopicActive) { - startProducer(); + protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { + brokerService.executor().schedule(() -> { + if (state == State.Terminating || state == State.Terminated) { + log.info("[{}] Skip scheduled to start the producer since the replicator state is : {}", + replicatorId, state); + return; } - }).exceptionally(ex -> { - log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); - return null; - }); + CompletableFuture<Optional<Topic>> topicFuture = brokerService.getTopics().get(localTopicName); + if (topicFuture == null) { + // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed successfully." + + " And trigger a terminate.", replicatorId); + terminate(); + return; + } + topicFuture.thenAccept(optional -> { + if (optional.isEmpty()) { + // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); + return; + } + if (optional.get() != localTopic) { + // Topic closed and created a new one, current replicator is outdated. + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); + return; + } + Replicator replicator = localTopic.getReplicators().get(remoteCluster); + if (replicator != AbstractReplicator.this) { + // Current replicator has been closed, and created a new one. + log.info("[{}] Skip scheduled to start the producer since a new replicator has instead current" + + " one. And trigger a terminate.", replicatorId); + terminate(); + return; + } + startProducer(); + }).exceptionally(ex -> { + log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" Review Comment: Maybe this log should be error level. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ########## @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override - protected void readEntries(Producer<byte[]> producer) { - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting + protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) { + // Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); - HAVE_PENDING_READ_UPDATER.set(this, FALSE); - this.producer = (ProducerImpl) producer; - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { - log.info("[{}] Created replicator producer", replicatorId); + /** + * 1. Try change state to {@link Started}. + * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value + * producer when the state is {@link Started}. + */ + Pair<Boolean, State> changeStateRes; + changeStateRes = compareSetAndGetState(Starting, Started); + if (changeStateRes.getLeft()) { + this.producer = (ProducerImpl) producer; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); + // Trigger a new read. + log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); backOff.reset(); - // activate cursor: so, entries can be cached + // activate cursor: so, entries can be cached. this.cursor.setActive(); // read entries readMoreEntries(); } else { - log.info( - "[{}] Replicator was stopped while creating the producer." - + " Closing it. Replicator state: {}", - replicatorId, STATE_UPDATER.get(this)); - STATE_UPDATER.set(this, State.Stopping); - closeProducerAsync(); + if (changeStateRes.getRight() == Started) { + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Replicator was already started by another thread while creating the producer." + + " Closing the producer newly created. Replicator state: {}", replicatorId, state); + } else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) { + log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}", Review Comment: This is a little confusing, if the replicator was terminating or terminated, do we need to close the producer again? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ########## @@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName()); - STATE_UPDATER.set(this, State.Stopped); + STATE_UPDATER.set(this, State.Disconnected); } protected abstract String getProducerName(); - protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer); + protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> producer); protected abstract Position getReplicatorReadPosition(); - protected abstract long getNumberOfEntriesInBacklog(); + public abstract long getNumberOfEntriesInBacklog(); protected abstract void disableReplicatorRead(); public String getRemoteCluster() { return remoteCluster; } - // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer - // the end result can be disconnect. - public synchronized void startProducer() { - if (STATE_UPDATER.get(this) == State.Stopping) { - long waitTimeMs = backOff.next(); - if (log.isDebugEnabled()) { - log.debug( - "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", - replicatorId, waitTimeMs / 1000.0); - } - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); - return; - } - State state = STATE_UPDATER.get(this); - if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { - if (state == State.Started) { - // Already running + public void startProducer() { + // Guarantee only one task call "producerBuilder.createAsync()". + Pair<Boolean, State> setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); + if (!setStartingRes.getLeft()) { + if (setStartingRes.getRight() == State.Starting) { + log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", + replicatorId, state); + } else if (setStartingRes.getRight() == State.Started) { + // Since the method "startProducer" will be called even if it is started, only print debug-level log. + if (log.isDebugEnabled()) { + log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); + } + } else if (setStartingRes.getRight() == State.Disconnecting) { if (log.isDebugEnabled()) { - log.debug("[{}] Replicator was already running", replicatorId); + log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." + + " state: {}", replicatorId, state); } + delayStartProducerAfterDisconnected(); } else { - log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); + /** {@link State.Terminating}, {@link State.Terminated}. **/ + log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } - return; } log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { - readEntries(producer); + setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + Pair<Boolean, State> setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { - log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectedRes.getRight() == State.Terminated) { + log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", + replicatorId, state); + } else { + /** {@link State.Disconnected}, {@link State.Starting}, {@link State.Started} **/ + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other thread will try to create the producer again. so skipped current one task." + + " State is : {}", + replicatorId, state); + } } return null; }); + } + /*** + * The producer is disconnecting, delay to start the producer. + * If we start a producer immediately, we will get a conflict producer(same name producer) registered error. + */ + protected void delayStartProducerAfterDisconnected() { + long waitTimeMs = backOff.next(); + if (log.isDebugEnabled()) { + log.debug( + "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", + replicatorId, waitTimeMs / 1000.0); + } + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } - protected void checkTopicActiveAndRetryStartProducer() { - isLocalTopicActive().thenAccept(isTopicActive -> { - if (isTopicActive) { - startProducer(); + protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { + brokerService.executor().schedule(() -> { + if (state == State.Terminating || state == State.Terminated) { + log.info("[{}] Skip scheduled to start the producer since the replicator state is : {}", + replicatorId, state); + return; } - }).exceptionally(ex -> { - log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); - return null; - }); + CompletableFuture<Optional<Topic>> topicFuture = brokerService.getTopics().get(localTopicName); + if (topicFuture == null) { + // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed successfully." + + " And trigger a terminate.", replicatorId); + terminate(); + return; + } + topicFuture.thenAccept(optional -> { + if (optional.isEmpty()) { + // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); + return; + } + if (optional.get() != localTopic) { + // Topic closed and created a new one, current replicator is outdated. + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); + return; + } Review Comment: Could we merge these two cases? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ########## @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override - protected void readEntries(Producer<byte[]> producer) { - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting + protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) { + // Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); - HAVE_PENDING_READ_UPDATER.set(this, FALSE); - this.producer = (ProducerImpl) producer; Review Comment: Why do we need to move these two lines? It seems that if we don't set producer, the operation `doCloseProducerAsync(producer, () -> {}); ` can't close the producer. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ########## @@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl< } } - @Override - public CompletableFuture<Void> disconnect() { - return disconnect(false); - } - - @Override - public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) { - final CompletableFuture<Void> future = new CompletableFuture<>(); - - super.disconnect(failIfHasBacklog).thenRun(() -> { - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); Review Comment: Don't forget to release the rate limiter resource. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ########## @@ -3870,6 +3879,27 @@ private void unfenceTopicToResume() { subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; + replicatorsResume(); + } + + private void replicatorsResume() { + removeTerminatingReplicators(replicators); + removeTerminatingReplicators(shadowReplicators); + checkReplication(); + checkShadowReplication(); + } + + private static void removeTerminatingReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) { Review Comment: Maybe this method doesn't need the `static` qualifier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org