Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-20 Thread via GitHub


gharris1727 merged PR #15305:
URL: https://github.com/apache/kafka/pull/15305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-20 Thread via GitHub


gharris1727 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2121455690

   Test failures appear unrelated, and the tests pass for me locally. LGTM, and 
thanks @vamossagar12 for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2117312634

   Thanks @gharris1727 for another round of review. I have made the changes. 
Please review when you have some time. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604768491


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +268,20 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+Stage currentStage = listener.onPollTimeoutExpiry();
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +

Review Comment:
   Got it, yeah I changed the message accordingly. I tried to keep the logline 
similar to the one in consumer's poll timeout but i think it's fine to deviate 
in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604767686


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
 );
 }
 
+@Test
+public void testPollTimeoutExpiry() throws Exception {
+// This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+// task#stop method which is blocked. The timeouts have been set 
accordingly
+workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+
+connect.start();
+
+connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
+
+Map connectorWithBlockingTaskStopConfig = new 
HashMap<>();
+connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
 Objects.requireNonNull(TASK_STOP));
+
+connect.configureConnector(CONNECTOR_NAME, 
connectorWithBlockingTaskStopConfig);
+
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+);
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+connect.restartTask(CONNECTOR_NAME, 0);
+TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&

Review Comment:
   Yeah I wanted to add the test in `BlockingConnectorTest` itself but it would 
have meant a lot of changes in that class. That is because currently that test 
doesn't support setting worker level properties or changing the number of 
workers. Being able to change the worker level properties was the way I could 
get the poll timeout expiry. 
   Moreover, the test I have added doesn't really block for the entire stop 
method but ends almost after the task shutdown graceful ms period ends because 
of the reset at the end of the test. Let me know if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604759527


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java:
##
@@ -36,4 +37,15 @@ public interface WorkerRebalanceListener {
  * or tasks might refer to all or some of the connectors and tasks running 
on the worker.
  */
 void onRevoked(String leader, Collection connectors, 
Collection tasks);
+
+
+/**
+ * Invoked when a worker experiences a poll timeout expiry. Invoking this 
method allows getting
+ * the stage which was currently being executed when the poll timeout 
happened. The default implementation
+ * returns null
+ * @return The current stage being executed. Could be null
+ */
+default Stage onPollTimeoutExpiry() {

Review Comment:
   I added the default bit to avoid changing the tests. Turns out it should be 
ok to modify them so I removed the default bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-17 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604757990


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection 
connectors, Collection

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596969230


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture future)
 }
 
 // visible for testing
-synchronized RequestFuture sendHeartbeatRequest() {
+public synchronized RequestFuture sendHeartbeatRequest() {

Review Comment:
   This change is no longer valid. I was trying out some things to make the 
`WorkerCoordinator` test work and this came through because of that. Will 
remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596961414


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
 );
 }
 
+@Test
+public void testPollTimeoutExpiry() throws Exception {
+// This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+// task#stop method which is blocked. The timeouts have been set 
accordingly
+workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+
+connect.start();
+
+connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
+
+Map connectorWithBlockingTaskStopConfig = new 
HashMap<>();
+connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
 Objects.requireNonNull(TASK_STOP));
+
+connect.configureConnector(CONNECTOR_NAME, 
connectorWithBlockingTaskStopConfig);
+
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+);
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+connect.restartTask(CONNECTOR_NAME, 0);
+TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&

Review Comment:
   Could this assertion be added to an existing BlockingConnectorTest? The 
blocking plugins are inherently slow to use so we should avoid adding more 
instances of them.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture future)
 }
 
 // visible for testing
-synchronized RequestFuture sendHeartbeatRequest() {
+public synchronized RequestFuture sendHeartbeatRequest() {

Review Comment:
   I would respect the "visible for testing" comment above, and leave this 
package-local.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +268,20 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+Stage currentStage = listener.onPollTimeoutExpiry();
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +

Review Comment:
   From a user perspective, the class and method names are irrelevant, and 
bringing up irrelevant details in diagnostics can be misleading. "The last 
thing the worker was doing was: {} and may contribute to this timeout" is much 
more understandable and still gets the point across.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection 
connectors, Collection connectors, 
Collection tasks);
+
+
+/**
+ * Invoked when a worker experiences a poll timeout expiry. Invoking this 
method allows getting
+ * the stage which was currently being executed when the poll timeout 
happened. The default implementation
+ * returns null
+ * @return The current stage being executed. Could be null
+ */
+default Stage onPollTimeoutExpiry() {

Review Comment:
   This is an internal interface, unless this default method actually makes 
sense on it's own I wouldn't add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596952044


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {

Review Comment:
   Yeah the tick thread stage is a best effort and can be misleading. However, 
if something has blocked long enough to cause a poll timeout, its likely enough 
to continue for the additional time it requires for the heartbeat thread to 
notice.
   
   I think the ideal use-case I see is that this error pops up in a worker log 
3 or more times before an operator has a chance to remediate it, and if the 
majority of logs blame connector-xyz, the operator can STOP that connector.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596621705


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java:
##
@@ -76,7 +76,7 @@ public void assertAtLeastNumWorkersAreUp(int numWorkers, 
String detailMessage) t
 }
 
 /**
- * Assert that at least the requested number of workers are up and running.
+ * Assert that the exact number of workers are up and running.

Review Comment:
   I tokk the liberty and edited this comment in this PR. Hope that's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2104218899

   Thanks @gharris1727 , I have made the changes as you suggested. I also 
modified the warning line that is printed based on the tick thread stage that 
is presented to the coordinator and also added a test to verify the same. 
LMKWYT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596476664


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {

Review Comment:
   Thanks Greg, I think that makes sense. I have extended the 
`WorkerRebalanceListener` to add another hook upon poll timeout expiry which 
the coordinator invokes. This way, it is able to access the tick thread's stage 
which was being executed at that point of time. I am just thinking, could there 
be race conditions where what we get in the tick thread stage might not always 
reflect the point where tick thread is blocked (like it moved on by the time we 
invoke this or becomes null), but I guess it should be ok because this can be 
treated as best effort? WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1591278453


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +

Review Comment:
   I think this is decent advice when requests are small and can be distributed 
around the cluster, but as REST requests are rather infrequent, I think this is 
the minority of cases.
   
   I think most often this timeout is going to be triggered by an excessively 
slow connector start, stop, or validation. In those cases, adding more workers 
does nothing but move the error to a different worker. I think we can keep the 
"adding more workers" comment, if we include another piece of advice for 
debugging excessively blocking tasks. If we don't have that other piece of 
advice, then advising users to add workers is misleading.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {

Review Comment:
   Since we (as maintainers) don't have good insight into what commonly causes 
the herder tick thread to block and the poll timeout to fire, we recently added 
https://issues.apache.org/jira/browse/KAFKA-15563 to help users debug it 
themselves.
   
   It would be nice to integrate with this system to have the heartbeat thread 
report what the herder tick thread was blocked on at the time that the poll 
timeout happened, as this would report stalling that isn't caused by REST 
requests.
   
   The integration is tricky though, because the WorkerCoordinator is (and 
should be) unaware of the DistributedHerder. And currently I think the 
WorkerCoordinator hides these internal disconnects and reconnects inside of the 
poll method. Perhaps we can extend the WorkerRebalanceListener or have a new 
error listener to allow the herder to be informed about these errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-02 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1588778443


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +
+"rebalance.timeout.ms also controls the maximum allowed time for 
each worker to join the group once a " +
+"rebalance has begun so the set value should not be very high");

Review Comment:
   Mickael/Greg could you please review the above message to see if it makes 
sense for connect? Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-25 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1579057875


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   @showuon , actually I typed in the wrong message yesterday. I ran the tests 
multiple times and it passes as expected. The reason I tried doing the other 
approach was that in the other 
[comment](https://github.com/apache/kafka/pull/15305#discussion_r1570226078) 
you had mentioned that it is not normal for consumers to not send heartbeats 
(other than readability). So to be closer to the real case, I had kept it.
   
   Nonetheless I have reverted the code based on your suggestion and it works 
as well. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577316104


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   Yes, it should not throw session timeout anymore if you take my change 
above, because we only sleep once with `smallRebalanceTimeout + 1` ms, so the 
time will never exceed sessionTimeout. Does that make sense?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577225484


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   I can try again to validate it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577223693


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   I tried that, it still threw session timeout once in a while when I ran it 
for 30 times . So to be in the safer side, I added it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577220797


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +
+"rebalance.timeout.ms also controls the maximum allowed time for 
each worker to join the group once a " +
+"rebalance has begun so the set value should not be very high");

Review Comment:
   @gharris1727 @mimaison , could you help review these log output since you're 
the connect experts. Thanks.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +
+"rebalance.timeout.ms also controls the maximum allowed time for 
each worker to join the group once a " +
+"rebalance has begun so the set value should not be very high");

Review Comment:
   @gharris1727 @mimaison , could you help review these log output since you're 
the connect experts? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-23 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1577219150


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   If you set `RebalanceTimeout < sessionTimeout`, why do we need the 
heartBeatRequest? You should be able to sleep (RebalanceTimeout + 1) directly 
to get the log. Ex:
   
   ```
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   coordinator.poll(0, () -> null);
   
time.sleep(smallRebalanceTimeout + 1);
   
   // The heartbeat thread is running and keeps sending heartbeat 
requests.
   TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&
   logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getMessage().startsWith("worker poll timeout has expired"), ...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-22 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2070326565

   @showuon , I made some changes to the test so that we just heartbeat and not 
have to rely upon FindCoordinator requests. Let me know what yo think. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   This sleep value, though small, was the one that eventually worked after 
running the test continuously for 30 times. Sleeping for 10s still kept failing 
because sometimes the coordinator's session timeout would happen even though it 
doesn't look like it should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571147929


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+

Review Comment:
   For the purpose of this test, I create a new coordinator with rebalance 
timeout lesser than session timeout . As mentioned in the comment, it mayn't 
happen in the real world, but it makes testing a lot easier. Let me know what 
you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   This sleep value, though small was the one that eventually worked after 
running the test continuously for 30 times. Sleeping for 10s still kept failing 
because sometimes the coordinator's session timeout would happen even though it 
doesn't look like it should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571147929


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+

Review Comment:
   For the purpose of this test, I create a new coordinator with rebalance 
timeout lesser than session timeout . As mentioned in the comment, it mayn't 
happen in the real world, but it makes testing a lot easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571146638


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   yeah I got that. I added heartbeat request response till the rebalance 
timeout happens. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570226078


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   What I want to say is, with your current change, this test will work like 
what you did before, which is not sending heartbeat response at all, and 
forcing session timeout directly. That works though, it's just not readable and 
doesn't make sense (i.e. not normal experience).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570217037


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   Hmm.. I see. The reason is because we didn't make sure the heartbeat is sent 
after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the 
heartbeat request or response is sent before next sleep? With that, we can make 
sure the session timeout won't happen because we did send out heartbeat every 
sessionTimeoutMs - 1. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570111991


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+// prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
+// that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+// sleep until sessionTimeoutMs to trigger a heartBeat request 
to avoid session timeout.
+// Not sure if this will be flaky in CI because the heartbeat 
thread might not send out the heartBeat request in time.

Review Comment:
   Should we remove this line?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   Hmm.. I see. The reason is because we don't make sure the heartbeat is sent 
after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the 
heartbeat request or response is sent before next sleep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570217037


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   Hmm.. I see. The reason is because we didn't make sure the heartbeat is sent 
after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the 
heartbeat request or response is sent before next sleep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-17 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2062936947

   @showuon , just checking did you get a chance to look at the updated test? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1559256180


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   If I don't have these 2 lines, the tests become flaky. With the 2 lines 
added, I ran all the tests in WorkerCoordinatorTest 30 times and all the tests 
passes. This is needed because sometimes in the test the connectivity with 
coordinator goes away due to session timeout and a classcast exception gets 
thrown. Adding logs for referecene:
   
   ```
   [2024-04-10 16:35:06,023] INFO Cluster ID: kafka-cluster 
(org.apache.kafka.clients.Metadata:349)
   [2024-04-10 16:35:06,028] DEBUG Sending FindCoordinator request to broker 
localhost:1969 (id: 0 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:904)
   [2024-04-10 16:35:06,029] DEBUG Received FindCoordinator response 
ClientResponse(receivedTimeMs=1712747106023, latencyMs=0, disconnected=false, 
timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, 
apiVersion=4, clientId=mockClientId, correlationId=0, headerVersion=2), 
responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
errorMessage='', nodeId=0, host='', port=0, 
coordinators=[Coordinator(key='test-group', nodeId=0, host='localhost', 
port=1969, errorCode=0, errorMessage='NONE')])) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:917)
   [2024-04-10 16:35:06,029] INFO Discovered group coordinator localhost:1969 
(id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:936)
   [2024-04-10 16:35:06,030] INFO Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
   [2024-04-10 16:35:06,030] DEBUG Heartbeat thread started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1481)
   [2024-04-10 16:35:06,030] DEBUG Cooperative rebalance triggered. Keeping 
assignment null until it's explicitly revoked. 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:250)
   [2024-04-10 16:35:06,030] INFO (Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
   [2024-04-10 16:35:06,031] DEBUG Sending JoinGroup 
(JoinGroupRequestData(groupId='test-group', sessionTimeoutMs=30, 
rebalanceTimeoutMs=60, memberId='', groupInstanceId=null, 
protocolType='connect', protocols=[JoinGroupRequestProtocol(name='compatible', 
metadata=[0, 1, 0, 14, 108, 101, 97, 100, 101, 114, 85, 114, 108, 58, 56, 48, 
56, 51, 0, 0, 0, 0, 0, 0, 0, 4, -1, -1, -1, -1]), 
JoinGroupRequestProtocol(name='default', metadata=[0, 0, 0, 14, 108, 101, 97, 
100, 101, 114, 85, 114, 108, 58, 56, 48, 56, 51, 0, 0, 0, 0, 0, 0, 0, 4])], 
reason='')) to coordinator localhost:1969 (id: 2147483647 rack: null) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:617)
   [2024-04-10 16:35:06,031] DEBUG Received successful JoinGroup response: 
JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, 
protocolType=null, protocolName='default', leader='leader', 
skipAssignment=false, memberId='member', members=[]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:645)
   [2024-04-10 16:35:06,031] DEBUG Enabling heartbeat thread 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:1449)
   [2024-04-10 16:35:06,031] INFO Successfully joined group with generation 
Generation{generationId=1, memberId='member', protocol='default'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
   [2024-04-10 16:35:06,031] DEBUG Sending follower SyncGroup to coordinator 
localhost:1969 (id: 2147483647 rack: null): 
SyncGroupRequestData(groupId='test-group', generationId=1, memberId='member', 
groupInstanceId=null, protocolType='connect', protocolName='default', 
assignments=[]) 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:759)
   [2024-04-10 16:35:06,032] DEBUG Received successful SyncGr

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1559253060


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   Thanks @showuon for the suggestions. I updated the test now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should wait, the real timeout for heartBeat should be sessionTimeout, so we can 
set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
   when(configStorage.snapshot()).thenReturn(configState1);
   
   
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
   coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
   client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
   
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
   Collections.singletonList(taskId1x0), Errors.NONE));
   
   // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
   // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   coordinator.poll(0, () -> {
   return null;
   });
   
   // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
   // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
   TestUtils.waitForCondition(() -> {
   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
   time.sleep(sessionTimeoutMs - 1);
   return logC

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should send, but the real timeout for heartBeat should be sessionTimeout, so we 
can set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
   when(configStorage.snapshot()).thenReturn(configState1);
   
   
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
   coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
   client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
   
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
   Collections.singletonList(taskId1x0), Errors.NONE));
   
   // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
   // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   coordinator.poll(0, () -> {
   return null;
   });
   
   // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
   // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
   TestUtils.waitForCondition(() -> {
   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
   time.sleep(sessionTimeoutMs - 1);
   return 

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should send, but the real timeout for heartBeat should be sessionTimeout, so we 
can set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
   when(configStorage.snapshot()).thenReturn(configState1);
   
   
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
   coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
   client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
   
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
   Collections.singletonList(taskId1x0), Errors.NONE));
   
   // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
   // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
   try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
   coordinator.ensureActiveGroup();
   System.out.println("!!! poll");
   coordinator.poll(0, () -> {
   return null;
   });
   
   // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
   // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
   TestUtils.waitForCondition(() -> {
   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
   time.sleep(se

Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-08 Thread via GitHub


showuon commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2042586830

   Will check it this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-04 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2036805236

   @showuon would you have time to review this? It's a smallish change. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-02-01 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1474605205


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +
+"If you see this happening consistently, then it can be addressed 
by either adding more workers " +
+"to the connect cluster or by increasing the rebalance.timeout.ms 
configuration value. Please note that " +

Review Comment:
   The bit about `then it can be addressed by either adding more workers`, is 
something which has been experienced in certain cases when there are too many 
tasks to start which leads to poll timeout. I can remove it if it's not deemed 
necessary here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-02-01 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1474598172


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+long now = time.milliseconds();
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+time.sleep(heartbeatIntervalMs - 1);
+return time.milliseconds() > now + rebalanceTimeoutMs;
+}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+coordinator.poll(0, () -> {

Review Comment:
   Note that despite multiple permutations of the timeout configs, I still see 
one session timeout which seems hard to eliminate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-02-01 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1474595936


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -93,9 +96,9 @@ public class WorkerCoordinatorTest {
 private final ConnectorTaskId taskId3x0 = new 
ConnectorTaskId(connectorId3, 0);
 
 private final String groupId = "test-group";
-private final int sessionTimeoutMs = 10;
+private final int sessionTimeoutMs = 30;
 private final int rebalanceTimeoutMs = 60;
-private final int heartbeatIntervalMs = 2;
+private final int heartbeatIntervalMs = 7;

Review Comment:
   I increased `sessionTimeoutMs` and `heartbeatIntervalMs` by small numbers so 
as to be able to mock a poll timeout expiry without a lot of session timeouts 
and losing connectivity with Group Coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-02-01 Thread via GitHub


vamossagar12 opened a new pull request, #15305:
URL: https://github.com/apache/kafka/pull/15305

   Please refer to the description in 
[KAFKA-16197](https://issues.apache.org/jira/browse/KAFKA-16197). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org