ableegoldman commented on a change in pull request #10444:
URL: https://github.com/apache/kafka/pull/10444#discussion_r604505465
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1013,28 +1016,34 @@ void addRecordsToTasks(final ConsumerRecords<byte[],
byte[]> records) {
*/
int commit(final Collection<Task> tasksToCommit) {
int committed = 0;
- if (rebalanceInProgress) {
- committed = -1;
- } else {
- final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
- try {
- committed =
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit,
consumedOffsetsAndMetadataPerTask);
- } catch (final TimeoutException timeoutException) {
- consumedOffsetsAndMetadataPerTask
- .keySet()
- .forEach(t ->
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
- }
+
+ final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask = new HashMap<>();
+ try {
+ committed =
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit,
consumedOffsetsAndMetadataPerTask);
+ } catch (final TimeoutException timeoutException) {
+ consumedOffsetsAndMetadataPerTask
+ .keySet()
+ .forEach(t ->
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
+
return committed;
}
/**
+ * @throws TaskMigratedException if committing offsets failed (non-EOS)
+ * or if the task producer got fenced (EOS)
+ * @throws TimeoutException if committing offsets failed due to
TimeoutException (non-EOS)
+ * @throws TaskCorruptedException if committing offsets failed due to
TimeoutException (EOS)
* @param consumedOffsetsAndMetadataPerTask an empty map that will be
filled in with the prepared offsets
+ * @return number of committed offsets, or -1 if we are in the middle of a
rebalance and cannot commit
*/
private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final
Collection<Task> tasksToCommit,
final
Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsAndMetadataPerTask) {
- int committed = 0;
+ if (rebalanceInProgress) {
+ return -1;
+ }
Review comment:
This is the only logical change, I moved this check from
`TaskManager#commit` to this method. And added a bunch of comments/docs so we
don't forget this again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]