ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823421009



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -264,48 +267,46 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl<Void>
         }
     }
 
-    private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> 
removeTopologyFuture,
-                                                   final Set<TopicPartition> 
partitionsToReset) {
-        if (!partitionsToReset.isEmpty()) {
-            removeTopologyFuture.whenComplete((v, throwable) -> {
-                if (throwable != null) {
-                    removeTopologyFuture.completeExceptionally(throwable);
-                }
-                DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-                while (deleteOffsetsResult == null) {
-                    try {
-                        deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-                            
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-                        deleteOffsetsResult.all().get();
-                    } catch (final InterruptedException ex) {
-                        ex.printStackTrace();
-                        break;
-                    } catch (final ExecutionException ex) {
-                        if (ex.getCause() != null &&
-                            ex.getCause() instanceof 
GroupSubscribedToTopicException &&
-                            ex.getCause()
-                                .getMessage()
-                                .equals("Deleting offsets of a topic is 
forbidden while the consumer group is actively subscribed to it.")) {
-                            ex.printStackTrace();
-                        } else if (ex.getCause() != null &&
-                            ex.getCause() instanceof GroupIdNotFoundException) 
{
-                            log.debug("The offsets have been reset by another 
client or the group has been deleted, no need to retry further.");
-                            break;
-                        } else {
-                            removeTopologyFuture.completeExceptionally(ex);
-                        }
-                        deleteOffsetsResult = null;
-                    }
-                    try {
-                        Thread.sleep(100);
-                    } catch (final InterruptedException ex) {
-                        ex.printStackTrace();
+    private void resetOffsets(final Set<TopicPartition> partitionsToReset) 
throws StreamsException {

Review comment:
       Sorry for the large diff -- it's mainly due to spacing from having moved 
the `1!partitionsToReset.isEmpty()`, plus one small stylistic change to  use a 
`while true` loop with `break`s because following the null status of the 
`deleteOffsetsResult` was a bit confusing.
   
   The real change though is that this method now just performs the offset 
resets directly, rather than directing whoever completes the 
`removeNamedTopology` future to perform the offset reset (which is non-trivial 
and thus not appropriate for the StreamThreads to do).
   
   We now invoke this directly when the user calls `get()` on the future 
returned from the RemoveNamedTopologyResult.
   
   This is the main change since being approved @wcarlson5  @guozhangwang 
   
   There's also the 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to