ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823421009
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -264,48 +267,46 @@ private boolean maybeCompleteFutureIfStillInCREATED(final
KafkaFutureImpl<Void>
}
}
- private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void>
removeTopologyFuture,
- final Set<TopicPartition>
partitionsToReset) {
- if (!partitionsToReset.isEmpty()) {
- removeTopologyFuture.whenComplete((v, throwable) -> {
- if (throwable != null) {
- removeTopologyFuture.completeExceptionally(throwable);
- }
- DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
- while (deleteOffsetsResult == null) {
- try {
- deleteOffsetsResult =
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
partitionsToReset);
- deleteOffsetsResult.all().get();
- } catch (final InterruptedException ex) {
- ex.printStackTrace();
- break;
- } catch (final ExecutionException ex) {
- if (ex.getCause() != null &&
- ex.getCause() instanceof
GroupSubscribedToTopicException &&
- ex.getCause()
- .getMessage()
- .equals("Deleting offsets of a topic is
forbidden while the consumer group is actively subscribed to it.")) {
- ex.printStackTrace();
- } else if (ex.getCause() != null &&
- ex.getCause() instanceof GroupIdNotFoundException)
{
- log.debug("The offsets have been reset by another
client or the group has been deleted, no need to retry further.");
- break;
- } else {
- removeTopologyFuture.completeExceptionally(ex);
- }
- deleteOffsetsResult = null;
- }
- try {
- Thread.sleep(100);
- } catch (final InterruptedException ex) {
- ex.printStackTrace();
+ private void resetOffsets(final Set<TopicPartition> partitionsToReset)
throws StreamsException {
Review comment:
Sorry for the large diff -- it's mainly due to spacing from having moved
the `1!partitionsToReset.isEmpty()`, plus one small stylistic change to use a
`while true` loop with `break`s because following the null status of the
`deleteOffsetsResult` was a bit confusing.
The real change though is that this method now just performs the offset
resets directly, rather than directing whoever completes the
`removeNamedTopology` future to perform the offset reset (which is non-trivial
and thus not appropriate for the StreamThreads to do).
We now invoke this directly when the user calls `get()` on the future
returned from the RemoveNamedTopologyResult.
This is the main change since being approved @wcarlson5 @guozhangwang
There's also the
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]