[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1198026691 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>()); Review Comment: @lucasbru thanks for the input. I don't have an IT for this because it was testable via unit tests. TBH, this PR is pretty old and lot of things have changed in the streams runtime since then and I am not totally upto date with those. Will need to revisit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185979279 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: I see. Either ways, moving the flag around is failing some or the other condition. I have asked Lucas in the above thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185978385 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>()); Review Comment: Thanks @lucasbru. Are you suggesting that with new state-updated core, we won't get the problems which this PR is trying to solve? I am not aware of the the new state updater code path TBH. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185975845 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); Review Comment: I used INFO because I thought this is something users should be able to see by default. I am hoping this won't be a very frequent case and hence should be ok to notify by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185974966 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { Review Comment: EOS_V2 was suggested by Guozhang [here](https://github.com/apache/kafka/pull/11433#discussion_r1147064683). Yeah I think !newActiveStatefulTasks.isEmpty() seems like the right condition to add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185973531 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); Review Comment: The call for `createNewTasks` was already there in `handleAssignment` to create new tasks. I just modified its return type to check if it's empty or not to trigger the tasks commit and offset commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185970601 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; Review Comment: Yeah. this one was suggested by Guozhang. It is still not passing all the tests though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185968862 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { Review Comment: Yeah that was pointed out by Guozhang as well [here](https://github.com/apache/kafka/pull/11433#discussion_r1147063043) and I had already made the change. Looks like you looked at an outdated version for this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1164262483 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Yeah that test didn't trigger `onAssignment` and invoked `handleRebalanceComplete` directly which explains the problem which means that for that test to work, we need to retain the `rebalanceInProgress = false` in `handleRebalanceComplete` as well. I think it should be ok to have this line in both `handleAssignment` and `handleRebalanceComplete` because the latter anyways signifies a rebalance completion. But, there are a couple of other things which seem to break now (sorry :( ) because of this new condition. One of them is `StreamThread#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress` . This one is interesting because it is invoking `handleAssignment` and then `start` and `stop` on the StreamThread and asserting that while rebalanance is in progress, active tasks are still returned. Now because, we are explicitly setting `rebalanceInProgress` to false in `handleAssignment` itself, this test fails. Since this is EOS-V1 related test, I moved the `rebalanceInProgress = false` inside the EOSv2 check and this test passes. But then, because of `rebalanceInProgress = false` within `handleAssignment`, `EosV2UpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosV2` fails when clients are closed cleanly. This one passes the moment I remove the line `rebalanceInProgress = false` from `handleAssignment`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1164262483 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Yeah that test didn't trigger `onAssignment` and invoked `handleRebalanceComplete` directly which explains the problem which means that for that test to work, we need to retain the `rebalanceInProgress = false` in `handleRebalanceComplete` as well. I think it should be ok to have this line in both `handleAssignment` and `handleRebalanceComplete` because the latter anyways signifies a rebalance completion. But, there are a couple of other things which seem to break now (sorry :( ) because of this new condition. One of them is `StreamThread#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress` . This one is interesting because it is invoking `handleAssignment` and then `start` and `stop` on the StreamThread and asserting that while rebalanance is in progress, active tasks are still returned. Now because, we are explicitly setting `rebalanceInProgress` to false in `handleAssignment` itself, this test fails. Since this is EOS-V1 related test, I moved the `rebalanceInProgress = false` inside the EOSv2 check and this test passes. But then, because of `rebalanceInProgress = false` within `handleAssignment`, `EosV2UpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosV2` fails when clients are closed cleanly. This one passes the moment I remove the line `rebalanceInProgress = false` from `handleAssignment`. All in all, this one line seems to be causing side effects at various places! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1158249466 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Thanks @guozhangwang for clarifying. I moved the `rebalance=false` assignment from `handleRebalanceComplete` into `handleAssignment` right before `createNewTasks`. It appears to me that it has a side effect. This test in `StreamThreadTest#shouldNotEnforceRebalanceWhenCurrentlyRebalancing` is a good indicator where in it's expecting a clean shutdown and a state transition to DEAD state but it's never happening. Because the rebalanceInProgress is never falsified, the `runLoop` stays in an infinite loop in `StreamThread` since this condition is never met: ``` if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < time.milliseconds()) { log.info("Triggering the followup rebalance scheduled for {}.", Utils.toLogDateTimeFormat(nextProbingRebalanceMs.get())); mainConsumer.enforceRebalance("triggered followup rebalance scheduled for " + nextProbingRebalanceMs.get()); nextProbingRebalanceMs.set(Long.MAX_VALUE); } ``` In the successful case(i.e with the rebalanceInProgress=false; retained in `handleRebalanceComplete` I see this: ``` [2023-04-05 14:39:20,664] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1168) [2023-04-05 14:39:22,442] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:239) [2023-04-05 14:40:31,036] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] Triggering the followup rebalance scheduled for 2023-04-05 14:39:14,762 +05:30. (org.apache.kafka.streams.processor.internals.StreamThread:621) [2023-04-05 14:40:31,039] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] Shutting down clean (org.apache.kafka.streams.processor.internals.StreamThread:1182) [2023-04-05 14:40:31,060] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread:239) [2023-04-05 14:40:31,060] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1217) ``` while in the other case, the follow up rebalance is never scheduled which means the current one never ends and the StreamThread can never shutdown looks like: ``` [2023-04-05 14:43:11,201] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread:572) [2023-04-05 14:43:11,205] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State transition from CREATED to STARTING (org.apache.kafka.streams.processor.internals.StreamThread:239) [2023-04-05 14:43:11,221] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1168) [2023-04-05 14:43:11,221] INFO stream-thread [stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:239) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this servi
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1156233120 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { +log.info("Couldn't commit any tasks since a rebalance is in progress"); +} else { +log.info("Committed {} transactions", numCommitted); Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1156232890 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); Review Comment: Oh I see. I read through the ticket and got some idea. Thanks for the explanation. I have updated the code to pass allTasks, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1156231874 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Thanks for this comment. IIUC, the suggestion is to move the `rebalanceInProgress == true` from `commitTasksAndMaybeUpdateCommittableOffsets` before creating new tasks and not create any new tasks or try to commit them if there's a rebalance in progress? Also, does the check in `commitTasksAndMaybeUpdateCommittableOffsets` still remain? I see it is being called from a couple of other places as well like `handleCorruption` and `commit`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1152708260 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java: ## @@ -1291,7 +1291,6 @@ private static List> readRecords(final String topic, final int maxMessages) { final List> consumerRecords; consumer.subscribe(singletonList(topic)); -System.out.println("Got assignment:" + consumer.assignment()); Review Comment: This was a stray sout from one of my earlier PRs which landed up in trunk. Didn't want to create a separate PR just for this :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1152707931 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +379,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Set activeTasksNeedCommit; +// Find new active tasks which need commit. +if (newActiveTasks == null || newActiveTasks.isEmpty()) { +activeTasksNeedCommit = new HashSet<>(); +} else { +activeTasksNeedCommit = newActiveTasks.stream().filter(Task::commitNeeded).collect(Collectors.toSet()); Review Comment: I see. I have modified this check accordingly, ie now I am checking if there are newly created active tasks and there's an in-flight txn, then look to commit tasks. Since `threadProducer()` works only for EOS-v2, this fix implicitly applies only for that version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1152707427 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -298,6 +299,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { Review Comment: Done. I see `commitTasksAndMaybeUpdateCommittableOffsets` already has the `task.commitNeeded()` check amongst other things so it makes all the more sense. Thanks for pointing this out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1021579198 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2470,7 +2474,7 @@ public void markChangelogAsCorrupted(final Collection partitions assertThat(unrevokedTaskChangelogMarkedAsCorrupted.get(), is(true)); assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); -assertThat(unrevokedActiveTask.state(), is(State.CREATED)); +assertThat(unrevokedActiveTask.state(), is(State.RUNNING)); Review Comment: @ableegoldman , this test has undergone some change. Most of it seems cosmetic to me but this one is a deviation from what the original test case was. This change seemed reasonable to me. Let me know what you think about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r858441929 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { + +final Map> consumedOffsetsPerTask = new HashMap<>(); +prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + +final Set dirtyTasks = new HashSet<>(); +try { +taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskCorruptedException e) { +log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", +e.corruptedTasks()); + +// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here +dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); +closeDirtyAndRevive(dirtyTasks, true); +} catch (final RuntimeException e) { +log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); +activeTasksCommitException.compareAndSet(null, e); +dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); +} + +// for non-revoking active tasks, we should not enforce checkpoint +// as it's EOS enabled in which case no checkpoint should be written while +// the task is in RUNNING tate +for (final Task task : activeTasksNeedCommit) { +if (!dirtyTasks.contains(task)) { +try { +task.postCommit(false); Review Comment: got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r858432879 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { + +final Map> consumedOffsetsPerTask = new HashMap<>(); +prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + +final Set dirtyTasks = new HashSet<>(); +try { +taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskCorruptedException e) { +log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", +e.corruptedTasks()); + +// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here +dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); +closeDirtyAndRevive(dirtyTasks, true); +} catch (final RuntimeException e) { +log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); +activeTasksCommitException.compareAndSet(null, e); +dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); +} + +// for non-revoking active tasks, we should not enforce checkpoint +// as it's EOS enabled in which case no checkpoint should be written while +// the task is in RUNNING tate +for (final Task task : activeTasksNeedCommit) { +if (!dirtyTasks.contains(task)) { +try { +task.postCommit(false); Review Comment: @showuon , can you plz review this as well whenever you get the chance? It's been open for a while.. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r851853450 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { + +final Map> consumedOffsetsPerTask = new HashMap<>(); +prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + +final Set dirtyTasks = new HashSet<>(); +try { +taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskCorruptedException e) { +log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", +e.corruptedTasks()); + +// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here +dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); +closeDirtyAndRevive(dirtyTasks, true); +} catch (final RuntimeException e) { +log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); +activeTasksCommitException.compareAndSet(null, e); +dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); +} + +// for non-revoking active tasks, we should not enforce checkpoint +// as it's EOS enabled in which case no checkpoint should be written while +// the task is in RUNNING tate +for (final Task task : activeTasksNeedCommit) { +if (!dirtyTasks.contains(task)) { +try { +task.postCommit(false); Review Comment: hi.. bumping this thread again... plz review whenever you get the chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org