guozhangwang commented on a change in pull request #11813:
URL: https://github.com/apache/kafka/pull/11813#discussion_r815537908



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -100,7 +100,7 @@ public void start(final NamedTopology initialTopology) {
     /**
      * Start up Streams with a collection of initial NamedTopologies (may be 
empty)
      */
-    public void start(final Collection<NamedTopology> initialTopologies) {
+    public synchronized void start(final Collection<NamedTopology> 
initialTopologies) {

Review comment:
       I took some time to understand why we want to synchronize here, as at 
the first sight it looks a bit unclear to me:
   
   /* means caller -> callee */ 
   inherited.start: synchronized, public -> 
   addNamedTopology: unsynchronized, public, register topology metadata ->
   completedFutureForUnstartedApp: synchronized, private, check state
   
   removeNamedTopology: unsynchronized, public, unregister metadata topology ->
   completedFutureForUnstartedApp: synchronized, private, check state
   
   Register/unregister topology metadata is synchronized, and parent.start 
would modify state.
   
   I think I understand now that it's because `addNamedTopology` is not 
synchronized, plus when we have multiple named topology we want to keep the 
state unchanged while adding them one-by-one. Is that the case? If yes maybe 
it's better to add such reasoning in the javadoc above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult 
removeNamedTopology(final String topologyToRemo
 
         topologyMetadata.unregisterTopology(removeTopologyFuture, 
topologyToRemove);
 
-        if (resetOffsets) {
+        if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing 
topology") && resetOffsets) {
             log.info("Resetting offsets for the following partitions of {} 
removed NamedTopology {}: {}",
                      removeTopologyFuture.isCompletedExceptionally() ? 
"unsuccessfully" : "successfully",
                      topologyToRemove, partitionsToReset
             );
-            if (!partitionsToReset.isEmpty()) {
-                removeTopologyFuture.whenComplete((v, throwable) -> {
-                    if (throwable != null) {
-                        removeTopologyFuture.completeExceptionally(throwable);
-                    }
-                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = 
null;
-                    while (deleteOffsetsResult == null) {
-                        try {
-                            deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-                                
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-                            deleteOffsetsResult.all().get();
-                        } catch (final InterruptedException ex) {
+            resetOffsets(removeTopologyFuture, partitionsToReset);
+        }
+        return new RemoveNamedTopologyResult(removeTopologyFuture);
+    }
+
+    /**
+     * @return  true iff the application is still in CREATED and the future 
was completed
+     */
+    private synchronized boolean completedFutureForUnstartedApp(final 
KafkaFutureImpl<Void> updateTopologyFuture,
+                                                                final String 
operation) {
+        if (state == State.CREATED && !updateTopologyFuture.isDone()) {
+            updateTopologyFuture.complete(null);
+            log.info("Completed {} since application has not been started", 
operation);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> 
removeTopologyFuture,

Review comment:
       I'm assuming this is just extracting the inlined function and hence 
skipped and did not compare line by line :)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult 
removeNamedTopology(final String topologyToRemo
 
         topologyMetadata.unregisterTopology(removeTopologyFuture, 
topologyToRemove);
 
-        if (resetOffsets) {
+        if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing 
topology") && resetOffsets) {

Review comment:
       nit: how about put `resetOffsets` as the first condition so that if it's 
false, we would skip the synchronized function (not sure if JIT would really be 
able to optimize this way)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to