wcarlson5 commented on a change in pull request #11813:
URL: https://github.com/apache/kafka/pull/11813#discussion_r815315400



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult 
removeNamedTopology(final String topologyToRemo
 
         topologyMetadata.unregisterTopology(removeTopologyFuture, 
topologyToRemove);
 
-        if (resetOffsets) {
+        if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing 
topology") && resetOffsets) {
             log.info("Resetting offsets for the following partitions of {} 
removed NamedTopology {}: {}",
                      removeTopologyFuture.isCompletedExceptionally() ? 
"unsuccessfully" : "successfully",
                      topologyToRemove, partitionsToReset
             );
-            if (!partitionsToReset.isEmpty()) {
-                removeTopologyFuture.whenComplete((v, throwable) -> {
-                    if (throwable != null) {
-                        removeTopologyFuture.completeExceptionally(throwable);
-                    }
-                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = 
null;
-                    while (deleteOffsetsResult == null) {
-                        try {
-                            deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-                                
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-                            deleteOffsetsResult.all().get();
-                        } catch (final InterruptedException ex) {
+            resetOffsets(removeTopologyFuture, partitionsToReset);

Review comment:
       I think we need to do something the the result of the `resetOffsets` 
method?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
##########
@@ -124,6 +124,17 @@ public void 
shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() {
         streams.start(asList(builder1.build(), builder2.build()));
     }
 
+    @Test
+    public void 
shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws 
Exception {

Review comment:
       This is a good test. But there is one more that I would like to see. It 
may have to be an integration test though. 
   
   If we add two topologies that have overlapping source topics to an 
un-started client, will we see an error? Or will that error not be discovered 
until after the streams object is started? I think it will be the later, 
however I think we should document this behavior and have a test for it so that 
we know when it changes. I could see this becoming a source of much confusion 
later on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to