wcarlson5 commented on a change in pull request #11813: URL: https://github.com/apache/kafka/pull/11813#discussion_r815315400
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (resetOffsets) { + if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { + resetOffsets(removeTopologyFuture, partitionsToReset); Review comment: I think we need to do something the the result of the `resetOffsets` method? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java ########## @@ -124,6 +124,17 @@ public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() { streams.start(asList(builder1.build(), builder2.build())); } + @Test + public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception { Review comment: This is a good test. But there is one more that I would like to see. It may have to be an integration test though. If we add two topologies that have overlapping source topics to an un-started client, will we see an error? Or will that error not be discovered until after the streams object is started? I think it will be the later, however I think we should document this behavior and have a test for it so that we know when it changes. I could see this becoming a source of much confusion later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org