guozhangwang commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823130503
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl<Void> private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture, final Set<TopicPartition> partitionsToReset) { - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { - ex.printStackTrace(); + final KafkaFutureImpl<Void> resetOffsetsFuture = new KafkaFutureImpl<>(); + try { + removeTopologyFuture.get(); Review comment: Why we have to wait on the first future before moving forward to construct the second future now? I thought the main fix is only in https://github.com/apache/kafka/pull/11868/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR1132 above, and with that we do not need to change behavior to wait for the removal of topology completes still? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org