ableegoldman commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823421009
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -264,48 +267,46 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl<Void> } } - private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture, - final Set<TopicPartition> partitionsToReset) { - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { - ex.printStackTrace(); - break; - } catch (final ExecutionException ex) { - if (ex.getCause() != null && - ex.getCause() instanceof GroupSubscribedToTopicException && - ex.getCause() - .getMessage() - .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { - ex.printStackTrace(); - } else if (ex.getCause() != null && - ex.getCause() instanceof GroupIdNotFoundException) { - log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); - break; - } else { - removeTopologyFuture.completeExceptionally(ex); - } - deleteOffsetsResult = null; - } - try { - Thread.sleep(100); - } catch (final InterruptedException ex) { - ex.printStackTrace(); + private void resetOffsets(final Set<TopicPartition> partitionsToReset) throws StreamsException { Review comment: Sorry for the large diff -- it's mainly due to spacing from having moved the `1!partitionsToReset.isEmpty()`, plus one small stylistic change to use a `while true` loop with `break`s because following the null status of the `deleteOffsetsResult` was a bit confusing. The real change though is that this method now just performs the offset resets directly, rather than directing whoever completes the `removeNamedTopology` future to perform the offset reset (which is non-trivial and thus not appropriate for the StreamThreads to do). We now invoke this directly when the user calls `get()` on the future returned from the RemoveNamedTopologyResult. This is the main change since being approved @wcarlson5 @guozhangwang There's also the -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org