Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-07-10 Thread via GitHub


ashoke-cube commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1672075592


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:
##
@@ -134,7 +134,30 @@ public void testInitializeFailure(ConnectorType 
connectorType) {
 
 verifyInitialize();
 verify(listener).onFailure(CONNECTOR, exception);
-verifyCleanShutdown(false);
+verifyCleanShutdown(true);
+}
+
+@ParameterizedTest
+@EnumSource(value = ConnectorType.class, names = {"SOURCE", "SINK"})
+public void testInitializeAndStopFailure() {

Review Comment:
   The test failures are not consistent 
[1](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16095/14/tests),
 
[2](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16095/13/tests),
 
[3](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16095/12/tests)
 and seem to be flaky. I have verified the test failures to be not related to 
this PR changes. What course of action would you suggest in this case? 
@gharris1727 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-24 Thread via GitHub


ashoke-cube commented on PR #16095:
URL: https://github.com/apache/kafka/pull/16095#issuecomment-2186292726

   > > Hi @ashoke-cube could you fix the build? Thanks!
   > 
   > Hey @gharris1727 I looked into the build failure. It is a bit weird. It is 
failing because it is not able to find the junit's `Test` class. Local test 
runs fine. It's not clear to me what the issue is here. Is there anything else 
to be done after adding new test cases?
   > 
   > ```
   > Task :connect:runtime:compileTestJava
   > 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_[PR-16095](https://confluentinc.atlassian.net/browse/PR-16095)/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:140:
 error: cannot find symbol
   > 
   > @Test
   > ^
   > symbol:   class Test
   > location: class WorkerConnectorTest
   > 
   > 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_[PR-16095](https://confluentinc.atlassian.net/browse/PR-16095)/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:393:
 error: cannot find symbol
   > 
   > @Test
   > ^
   > symbol:   class Test
   > location: class WorkerConnectorTest
   > ```
   
   Figured it. Fixing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-23 Thread via GitHub


ashoke-cube commented on PR #16095:
URL: https://github.com/apache/kafka/pull/16095#issuecomment-2184736478

   > Hi @ashoke-cube could you fix the build? Thanks!
   
   Hey @gharris1727 I looked into the build failure. It is a bit weird. It is 
failing because it is not able to find the junit's `Test` class. I am using the 
same annotation as every other test in that class. Local test runs fine. It's 
not clear to me what the issue is here.
   ```
   Task :connect:runtime:compileTestJava
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16095/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:140:
 error: cannot find symbol
   
   @Test
   ^
   symbol:   class Test
   location: class WorkerConnectorTest
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16095/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:393:
 error: cannot find symbol
   
   @Test
   ^
   symbol:   class Test
   location: class WorkerConnectorTest
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-18 Thread via GitHub


ashoke-cube commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1644974636


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -307,7 +307,7 @@ void doShutdown() {
 + " as the connector has been scheduled 
for shutdown"),
 null);
 }
-if (state == State.STARTED)
+if (state == State.STARTED || state == State.FAILED)
 connector.stop();

Review Comment:
   Done. Please do take a look.
   
   There are state transitions possible from INIT-> STOPPED/PAUSED. We don't 
call `stop()` in suspend, if the state is not STARTED. There shouldn't be any 
resource allocation in `initialize()` of a connector for this to be an issue, 
but wanted to confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-18 Thread via GitHub


gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1644895138


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -307,7 +307,7 @@ void doShutdown() {
 + " as the connector has been scheduled 
for shutdown"),
 null);
 }
-if (state == State.STARTED)
+if (state == State.STARTED || state == State.FAILED)
 connector.stop();

Review Comment:
   I think that some of the tests in WorkerConnectorTest exercising failed 
connectors will need some verifyCleanShutdown assertions updated. A test which 
tests exceptions in both start() and stop() would also be very helpful :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-18 Thread via GitHub


ashoke-cube commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1644889388


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -307,7 +307,7 @@ void doShutdown() {
 + " as the connector has been scheduled 
for shutdown"),
 null);
 }
-if (state == State.STARTED)
+if (state == State.STARTED || state == State.FAILED)
 connector.stop();

Review Comment:
   Yes, will do. Also, does this need any separate test to be written? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-18 Thread via GitHub


gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1644883959


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -307,7 +307,7 @@ void doShutdown() {
 + " as the connector has been scheduled 
for shutdown"),
 null);
 }
-if (state == State.STARTED)
+if (state == State.STARTED || state == State.FAILED)
 connector.stop();

Review Comment:
   connector.stop can fail, and if that happens then we don't transit to 
STOPPED, we re-transit to FAILED, but have a different error. I think this will 
shadow the exception that caused the connector to fail in the first place, 
which is almost certainly more confusing than helpful.
   
   In the catch clause, can you swap out the state/statusListener for 
onFailure? It has failure-deduplication logic that seems useful here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-17 Thread via GitHub


gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1643426226


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   > I think leaving the resources allocated when the connector is in the 
FAILED state until a restart is fine.
   
   I created a follow-up for releasing resources early here: 
https://issues.apache.org/jira/browse/KAFKA-16987 so we can focus on the 
startup leak in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-14 Thread via GitHub


ashoke-cube commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1640754632


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   @gharris1727 
   I agree on the assessment that having `connector.stop()` inside `onFailure` 
handler isn't right.
   
   Both the proposed solutions involve modifications to `doShutDown()` method 
to call `connector.stop()` when the state is FAILED. I will make that change.
   
   However, the proposed change to trigger doShutdown() early when 
transitioning to the FAILED state would move the connector to the STOPPED 
state. This is a shift, as it introduces a new state transition that was not 
previously possible. It could be misleading as well, as a connector in the 
STOPPED state might not clearly signal that a failure occurred, compared to a 
connector in the FAILED state. I am open to your suggestions here.
   
   I think leaving the resources allocated when the connector is in the FAILED 
state until a restart is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-10 Thread via GitHub


ashoke-cube commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1632856777


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   Hey @gharris1727, Thank you for taking a look. Apologies for the late 
response. I missed the review notification. I will take a look and get back to 
you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-10 Thread via GitHub


ashoke-cube commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1632856777


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   Hey @gharris1727, apologies for the late response. I missed the review 
notification. I will take a look and get back to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-05-30 Thread via GitHub


gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   This is a potentially blocking call to the connector, and I don't think 
that's a good fit for this onFailure handler. This call would delay the 
statusListener call, which delays notifying the REST API of the FAILED status 
and updating the metrics. If it blocks indefinitely, the status and metrics are 
never updated.
   
   There is a connector.stop() call in doShutdown that could be changed to 
execute for the INIT and FAILED states. That would leave the resources 
allocated while the connector is waiting in the FAILED state, but would at 
least ensure they don't leak long-term.
   
   We may also change the control flow to make the transition to the FAILED 
state trigger doShutdown early, rather than having it wait() with all the 
resources still allocated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org