guozhangwang commented on a change in pull request #11813: URL: https://github.com/apache/kafka/pull/11813#discussion_r815537908
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -100,7 +100,7 @@ public void start(final NamedTopology initialTopology) { /** * Start up Streams with a collection of initial NamedTopologies (may be empty) */ - public void start(final Collection<NamedTopology> initialTopologies) { + public synchronized void start(final Collection<NamedTopology> initialTopologies) { Review comment: I took some time to understand why we want to synchronize here, as at the first sight it looks a bit unclear to me: /* means caller -> callee */ inherited.start: synchronized, public -> addNamedTopology: unsynchronized, public, register topology metadata -> completedFutureForUnstartedApp: synchronized, private, check state removeNamedTopology: unsynchronized, public, unregister metadata topology -> completedFutureForUnstartedApp: synchronized, private, check state Register/unregister topology metadata is synchronized, and parent.start would modify state. I think I understand now that it's because `addNamedTopology` is not synchronized, plus when we have multiple named topology we want to keep the state unchanged while adding them one-by-one. Is that the case? If yes maybe it's better to add such reasoning in the javadoc above. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (resetOffsets) { + if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { + resetOffsets(removeTopologyFuture, partitionsToReset); + } + return new RemoveNamedTopologyResult(removeTopologyFuture); + } + + /** + * @return true iff the application is still in CREATED and the future was completed + */ + private synchronized boolean completedFutureForUnstartedApp(final KafkaFutureImpl<Void> updateTopologyFuture, + final String operation) { + if (state == State.CREATED && !updateTopologyFuture.isDone()) { + updateTopologyFuture.complete(null); + log.info("Completed {} since application has not been started", operation); + return true; + } else { + return false; + } + } + + private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture, Review comment: I'm assuming this is just extracting the inlined function and hence skipped and did not compare line by line :) ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (resetOffsets) { + if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { Review comment: nit: how about put `resetOffsets` as the first condition so that if it's false, we would skip the synchronized function (not sure if JIT would really be able to optimize this way)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org