[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-18 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1198026691


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   @lucasbru thanks for the input. I don't have an IT for this because it was 
testable via unit tests. TBH, this PR is pretty old and lot of things have 
changed in the streams runtime since then and I am not totally upto date with 
those. Will need to revisit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185979279


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   I see. Either ways, moving the flag around is failing some or the other 
condition. I have asked Lucas in the above thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185978385


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   Thanks @lucasbru. Are you suggesting that with new state-updated core, we 
won't get the problems which this PR is trying to solve? I am not aware of the 
the new state updater code path TBH.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185975845


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   I used INFO because I thought this is something users should be able to see 
by default. I am hoping this won't be a very frequent case and hence should be 
ok to notify by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185974966


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {

Review Comment:
   EOS_V2 was suggested by Guozhang 
[here](https://github.com/apache/kafka/pull/11433#discussion_r1147064683). Yeah 
I think  !newActiveStatefulTasks.isEmpty() seems like the right condition to 
add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185973531


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);

Review Comment:
   The call for `createNewTasks` was already there in `handleAssignment` to 
create new tasks. I just modified its return type to check if it's empty or not 
to trigger the tasks commit and offset commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185970601


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;

Review Comment:
   Yeah. this one was suggested by Guozhang. It is still not passing all the 
tests though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185968862


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {

Review Comment:
   Yeah that was pointed out by Guozhang as well 
[here](https://github.com/apache/kafka/pull/11433#discussion_r1147063043) and I 
had already made the change. Looks like you looked at an outdated version for 
this file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-12 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1164262483


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Yeah that test didn't trigger `onAssignment` and invoked 
`handleRebalanceComplete` directly which explains the problem which means that 
for that test to work, we need to retain the `rebalanceInProgress = false` in 
`handleRebalanceComplete` as well. I think it should be ok to have this line in 
both `handleAssignment` and `handleRebalanceComplete` because the latter 
anyways signifies a rebalance completion.
   
   But, there are a couple of other things which seem to break now (sorry :(  ) 
because of this new condition. One of them is 
`StreamThread#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress` . This one 
is interesting because it is invoking `handleAssignment` and then `start` and 
`stop` on the StreamThread and asserting that while rebalanance is in progress, 
active tasks are still returned. Now because, we are explicitly setting 
`rebalanceInProgress` to false in `handleAssignment` itself, this test fails. 
Since this is EOS-V1 related test, I moved the `rebalanceInProgress = false` 
inside the EOSv2 check and this test passes.
   
   But then, because of `rebalanceInProgress = false` within 
`handleAssignment`, 
`EosV2UpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosV2` fails when 
clients are closed cleanly. This one passes the moment I remove the line 
`rebalanceInProgress = false` from `handleAssignment`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-12 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1164262483


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Yeah that test didn't trigger `onAssignment` and invoked 
`handleRebalanceComplete` directly which explains the problem which means that 
for that test to work, we need to retain the `rebalanceInProgress = false` in 
`handleRebalanceComplete` as well. I think it should be ok to have this line in 
both `handleAssignment` and `handleRebalanceComplete` because the latter 
anyways signifies a rebalance completion.
   
   But, there are a couple of other things which seem to break now (sorry :(  ) 
because of this new condition. One of them is 
`StreamThread#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress` . This one 
is interesting because it is invoking `handleAssignment` and then `start` and 
`stop` on the StreamThread and asserting that while rebalanance is in progress, 
active tasks are still returned. Now because, we are explicitly setting 
`rebalanceInProgress` to false in `handleAssignment` itself, this test fails. 
Since this is EOS-V1 related test, I moved the `rebalanceInProgress = false` 
inside the EOSv2 check and this test passes.
   
   But then, because of `rebalanceInProgress = false` within 
`handleAssignment`, 
`EosV2UpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosV2` fails when 
clients are closed cleanly. This one passes the moment I remove the line 
`rebalanceInProgress = false` from `handleAssignment`.
   
   All in all, this one line seems to be causing side effects at various 
places! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1158249466


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Thanks @guozhangwang for clarifying. I moved the `rebalance=false` 
assignment from `handleRebalanceComplete` into `handleAssignment` right before 
`createNewTasks`. It appears to me that it has a side effect. This test in 
`StreamThreadTest#shouldNotEnforceRebalanceWhenCurrentlyRebalancing` is a good 
indicator where in it's expecting a clean shutdown and a state transition to 
DEAD state but it's never happening.
   
   Because the rebalanceInProgress is never falsified, the `runLoop` stays in 
an infinite loop in `StreamThread` since this condition is never met:
   
   ```
   if (!taskManager.rebalanceInProgress() && nextProbingRebalanceMs.get() < 
time.milliseconds()) {
   log.info("Triggering the followup rebalance scheduled 
for {}.", Utils.toLogDateTimeFormat(nextProbingRebalanceMs.get()));
   mainConsumer.enforceRebalance("triggered followup 
rebalance scheduled for " + nextProbingRebalanceMs.get());
   nextProbingRebalanceMs.set(Long.MAX_VALUE);
   }
   ```
   In the successful case(i.e with the rebalanceInProgress=false; retained in 
`handleRebalanceComplete` I see this: 
   
   ```
   [2023-04-05 14:39:20,664] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] 
Informed to shut down 
(org.apache.kafka.streams.processor.internals.StreamThread:1168)
   [2023-04-05 14:39:22,442] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State 
transition from STARTING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread:239)
   [2023-04-05 14:40:31,036] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] 
Triggering the followup rebalance scheduled for 2023-04-05 14:39:14,762 +05:30. 
(org.apache.kafka.streams.processor.internals.StreamThread:621)
   [2023-04-05 14:40:31,039] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] 
Shutting down clean 
(org.apache.kafka.streams.processor.internals.StreamThread:1182)
   [2023-04-05 14:40:31,060] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State 
transition from PENDING_SHUTDOWN to DEAD 
(org.apache.kafka.streams.processor.internals.StreamThread:239)
   [2023-04-05 14:40:31,060] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] 
Shutdown complete 
(org.apache.kafka.streams.processor.internals.StreamThread:1217)
   ```
   
   while in the other case, the follow up rebalance is never scheduled which 
means the current one never ends and the StreamThread can never shutdown looks 
like:
   
   ```
   [2023-04-05 14:43:11,201] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] 
Starting (org.apache.kafka.streams.processor.internals.StreamThread:572)
   [2023-04-05 14:43:11,205] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State 
transition from CREATED to STARTING 
(org.apache.kafka.streams.processor.internals.StreamThread:239)
   [2023-04-05 14:43:11,221] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] 
Informed to shut down 
(org.apache.kafka.streams.processor.internals.StreamThread:1168)
   [2023-04-05 14:43:11,221] INFO stream-thread 
[stream-thread-test-87bf53a8-54f2-485f-a4b6-acdbec0a8b3d-StreamThread-1] State 
transition from STARTING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread:239)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this servi

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156233120


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {
+log.info("Couldn't commit any tasks since a rebalance is in 
progress");
+} else {
+log.info("Committed {} transactions", numCommitted);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156232890


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   Oh I see. I read through the ticket and got some idea. Thanks for the 
explanation. I have updated the code to pass allTasks,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156231874


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Thanks for this comment. IIUC, the suggestion is to move the 
`rebalanceInProgress == true` from 
`commitTasksAndMaybeUpdateCommittableOffsets` before creating new tasks and not 
create any new tasks or try to commit them if there's a rebalance in progress? 
   Also, does the check in `commitTasksAndMaybeUpdateCommittableOffsets` still 
remain? I see it is being called from a couple of other places as well like 
`handleCorruption` and `commit`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-03-29 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1152708260


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##
@@ -1291,7 +1291,6 @@ private static  List> 
readRecords(final String topic,
  final int 
maxMessages) {
 final List> consumerRecords;
 consumer.subscribe(singletonList(topic));
-System.out.println("Got assignment:" + consumer.assignment());

Review Comment:
   This was a stray sout from one of my earlier PRs which landed up in trunk. 
Didn't want to create a separate PR just for this :D 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-03-29 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1152707931


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +379,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Set activeTasksNeedCommit;
+// Find new active tasks which need commit.
+if (newActiveTasks == null || newActiveTasks.isEmpty()) {
+activeTasksNeedCommit = new HashSet<>();
+} else {
+activeTasksNeedCommit = 
newActiveTasks.stream().filter(Task::commitNeeded).collect(Collectors.toSet());

Review Comment:
   I see. I have modified this check accordingly, ie now I am checking if there 
are newly created active tasks and there's an in-flight txn, then look to 
commit tasks. Since `threadProducer()` works only for EOS-v2, this fix 
implicitly applies only for that version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-03-29 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1152707427


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -298,6 +299,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {

Review Comment:
   Done. I see `commitTasksAndMaybeUpdateCommittableOffsets` already has the 
`task.commitNeeded()` check amongst other things so it makes all the more 
sense. Thanks for pointing this out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-11-14 Thread GitBox


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1021579198


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2470,7 +2474,7 @@ public void markChangelogAsCorrupted(final 
Collection partitions
 
 assertThat(unrevokedTaskChangelogMarkedAsCorrupted.get(), is(true));
 assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
-assertThat(unrevokedActiveTask.state(), is(State.CREATED));
+assertThat(unrevokedActiveTask.state(), is(State.RUNNING));

Review Comment:
   @ableegoldman , this test has undergone some change. Most of it seems 
cosmetic to me but this one is a deviation from what the original test case 
was. This change seemed reasonable to me. Let me know what you think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-04-26 Thread GitBox


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r858441929


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review Comment:
   got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-04-26 Thread GitBox


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r858432879


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review Comment:
   @showuon , can you plz review this as well whenever you get the chance? It's 
been open for a while.. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-04-17 Thread GitBox


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r851853450


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review Comment:
   hi.. bumping this thread again... plz review whenever you get the chance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org