guozhangwang commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823130503



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl<Void>
 
     private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> 
removeTopologyFuture,
                                                    final Set<TopicPartition> 
partitionsToReset) {
-        if (!partitionsToReset.isEmpty()) {
-            removeTopologyFuture.whenComplete((v, throwable) -> {
-                if (throwable != null) {
-                    removeTopologyFuture.completeExceptionally(throwable);
-                }
-                DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-                while (deleteOffsetsResult == null) {
-                    try {
-                        deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-                            
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-                        deleteOffsetsResult.all().get();
-                    } catch (final InterruptedException ex) {
-                        ex.printStackTrace();
+        final KafkaFutureImpl<Void> resetOffsetsFuture = new 
KafkaFutureImpl<>();
+        try {
+            removeTopologyFuture.get();

Review comment:
       Why we have to wait on the first future before moving forward to 
construct the second future now? I thought the main fix is only in 
https://github.com/apache/kafka/pull/11868/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR1132
 above, and with that we do not need to change behavior to wait for the removal 
of topology completes still?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to