mjsax commented on code in PR #17521:
URL: https://github.com/apache/kafka/pull/17521#discussion_r1804093092
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -806,46 +805,34 @@ void maybeGetClientInstanceIds() {
}
}
- if
(!processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) &&
- !threadProducerInstanceIdFuture.isDone()) {
-
+ if (!producerInstanceIdFuture.isDone()) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
- threadProducerInstanceIdFuture.complete(
+ producerInstanceIdFuture.complete(
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
);
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow
the error,
// to allow returning a partial result for all other
clients
- threadProducerInstanceIdFuture.complete(null);
+ producerInstanceIdFuture.complete(null);
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
-
threadProducerInstanceIdFuture.completeExceptionally(error);
+ producerInstanceIdFuture.completeExceptionally(error);
}
} else {
- threadProducerInstanceIdFuture.completeExceptionally(
+ producerInstanceIdFuture.completeExceptionally(
new TimeoutException("Could not retrieve thread
producer client instance id.")
);
}
}
- maybeResetFetchDeadline();
Review Comment:
We only use this once, and inline this method with this PR (as the method
itself is simplified)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]