[jira] [Assigned] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams

2024-06-30 Thread Wang Xiaoqing (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Xiaoqing reassigned KAFKA-16871:
-

Assignee: (was: Wang Xiaoqing)

> Clean up internal AssignmentConfigs class in Streams
> 
>
> Key: KAFKA-16871
> URL: https://issues.apache.org/jira/browse/KAFKA-16871
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, newbie++
>
> In KIP-924 we added a new public AssignmentConfigs class to hold all of the, 
> you guessed it, assignment related configs.
> However, there is an existing config class of the same name and largely the 
> same contents but that's in an internal package, specifically inside the 
> AssignorConfiguration class.
> We should remove the old AssignmentConfigs class that's in 
> AssignorConfiguration and replace any usages of it with the new public 
> AssignmentConfigs that we added in KIP-924



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17057) Add "retry" option to ProductionExceptionHandler

2024-06-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-17057:

Fix Version/s: 3.9.0

> Add "retry" option to ProductionExceptionHandler
> 
>
> Key: KAFKA-17057
> URL: https://issues.apache.org/jira/browse/KAFKA-17057
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.9.0
>
>
> With KAFKA-16508 we changed the KS behavior to call the 
> ProductionExceptionHandler for a single special case of a potentially missing 
> output topic, to break an infinite retry loop.
> However, this seems not to be very flexible, as users might want to retry for 
> some cases.
> We might also consider to not calling the handler when writing into internal 
> topics, as those _must_ exist.
> KIP-1065: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627309] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17057) Add "retry" option to ProductionExceptionHandler

2024-06-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-17057:
---

Assignee: Matthias J. Sax

> Add "retry" option to ProductionExceptionHandler
> 
>
> Key: KAFKA-17057
> URL: https://issues.apache.org/jira/browse/KAFKA-17057
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> With KAFKA-16508 we changed the KS behavior to call the 
> ProductionExceptionHandler for a single special case of a potentially missing 
> output topic, to break an infinite retry loop.
> However, this seems not to be very flexible, as users might want to retry for 
> some cases.
> We might also consider to not calling the handler when writing into internal 
> topics, as those _must_ exist.
> KIP-1065: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627309] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17057) Add "retry" option to ProductionExceptionHandler

2024-06-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-17057:

Description: 
With KAFKA-16508 we changed the KS behavior to call the 
ProductionExceptionHandler for a single special case of a potentially missing 
output topic, to break an infinite retry loop.

However, this seems not to be very flexible, as users might want to retry for 
some cases.

We might also consider to not calling the handler when writing into internal 
topics, as those _must_ exist.

KIP-1065: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627309] 

  was:
With KAFKA-16508 we changed the KS behavior to call the 
ProductionExceptionHandler for a single special case of a potentially missing 
output topic, to break an infinite retry loop.

However, this seems not to be very flexible, as users might want to retry for 
some cases.

We might also consider to not calling the handler when writing into internal 
topics, as those _must_ exist.


> Add "retry" option to ProductionExceptionHandler
> 
>
> Key: KAFKA-17057
> URL: https://issues.apache.org/jira/browse/KAFKA-17057
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> With KAFKA-16508 we changed the KS behavior to call the 
> ProductionExceptionHandler for a single special case of a potentially missing 
> output topic, to break an infinite retry loop.
> However, this seems not to be very flexible, as users might want to retry for 
> some cases.
> We might also consider to not calling the handler when writing into internal 
> topics, as those _must_ exist.
> KIP-1065: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627309] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17057) Add "retry" option to ProductionExceptionHandler

2024-06-30 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-17057:
---

 Summary: Add "retry" option to ProductionExceptionHandler
 Key: KAFKA-17057
 URL: https://issues.apache.org/jira/browse/KAFKA-17057
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


With KAFKA-16508 we changed the KS behavior to call the 
ProductionExceptionHandler for a single special case of a potentially missing 
output topic, to break an infinite retry loop.

However, this seems not to be very flexible, as users might want to retry for 
some cases.

We might also consider to not calling the handler when writing into internal 
topics, as those _must_ exist.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16508) Infinite loop if output topic does not exisit

2024-06-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16508.
-
Fix Version/s: 3.9.0
   Resolution: Fixed

> Infinite loop if output topic does not exisit
> -
>
> Key: KAFKA-16508
> URL: https://issues.apache.org/jira/browse/KAFKA-16508
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.9.0
>
>
> Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
> when writing into an output topic.
> However, if the output topic does not exist, the corresponding error cannot 
> be skipped over because the handler is not called.
> The issue is, that the producer internally retires to fetch the output topic 
> metadata until it times out, an a `TimeoutException` (which is a 
> `RetriableException`) is returned via the registered `Callback`. However, for 
> `RetriableException` there is different code path and the 
> `ProductionExceptionHandler` is not called.
> In general, Kafka Streams correctly tries to handle as many errors a possible 
> internally, and a `RetriableError` falls into this category (and thus there 
> is no need to call the handler). However, for this particular case, just 
> retrying does not solve the issue – it's unclear if throwing a retryable 
> `TimeoutException` is actually the right thing to do for the Producer? Also 
> not sure what the right way to address this ticket would be (currently, we 
> cannot really detect this case, except if we would do some nasty error 
> message String comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17056) Convert producer state metadata schemas to use generated protocol

2024-06-30 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17861003#comment-17861003
 ] 

Kuan Po Tseng commented on KAFKA-17056:
---

This looks interesting ! gentle ping [~chia7712] , if you are not working on 
this one, may I take it over ?

> Convert producer state metadata schemas to use generated protocol
> -
>
> Key: KAFKA-17056
> URL: https://issues.apache.org/jira/browse/KAFKA-17056
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>
> This is similar to KAFKA-10497 and KAFKA-10736
> related code: 
> https://github.com/apache/kafka/blob/33f5995ec379f0d18c6981106838c605ee94be7f/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L94



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17056) Convert producer state metadata schemas to use generated protocol

2024-06-30 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng reassigned KAFKA-17056:
-

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> Convert producer state metadata schemas to use generated protocol
> -
>
> Key: KAFKA-17056
> URL: https://issues.apache.org/jira/browse/KAFKA-17056
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>
> This is similar to KAFKA-10497 and KAFKA-10736
> related code: 
> https://github.com/apache/kafka/blob/33f5995ec379f0d18c6981106838c605ee94be7f/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L94



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17056) Convert producer state metadata schemas to use generated protocol

2024-06-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17056:
--

 Summary: Convert producer state metadata schemas to use generated 
protocol
 Key: KAFKA-17056
 URL: https://issues.apache.org/jira/browse/KAFKA-17056
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This is similar to KAFKA-10497 and KAFKA-10736

related code: 
https://github.com/apache/kafka/blob/33f5995ec379f0d18c6981106838c605ee94be7f/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L94



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2024-06-30 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14956:
--

Assignee: (was: Yash Mayya)

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> 

[jira] [Assigned] (KAFKA-14844) Kafka Connect's OffsetBackingStore interface should handle (de)serialization and connector namespacing

2024-06-30 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14844:
--

Assignee: (was: Yash Mayya)

> Kafka Connect's OffsetBackingStore interface should handle (de)serialization 
> and connector namespacing
> --
>
> Key: KAFKA-14844
> URL: https://issues.apache.org/jira/browse/KAFKA-14844
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Yash Mayya
>Priority: Major
>
> Relevant discussion here - 
> [https://github.com/apache/kafka/pull/13434/files#r114972]
>  
> TLDR - we should move serialization / deserialization and key construction 
> (connector namespacing) for source connector offsets from the 
> OffsetStorageWriter / OffsetStorageReader interfaces into the 
> OffsetBackingStore interface. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14858) Standalone herder does not handle exceptions thrown from connector taskConfigs method

2024-06-30 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14858:
--

Assignee: (was: Yash Mayya)

> Standalone herder does not handle exceptions thrown from connector 
> taskConfigs method
> -
>
> Key: KAFKA-14858
> URL: https://issues.apache.org/jira/browse/KAFKA-14858
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Priority: Major
>
> In distributed mode, if a connector throws an exception from its 
> {{taskConfigs}} method (invoked by the herder, through the {{Worker}} class, 
> [here|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1960]),
>  we wait for an exponential backoff period (see KAFKA-14732) and then [retry 
> the 
> operation|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1907-L1911].
> However, in standalone mode, not only do we not retry the operation, we do 
> not even log the exception. In addition, when REST calls are made that 
> require generating new task configs for a connector (which include creating 
> and reconfiguring a connector), if the connector's {{taskConfigs}} method 
> throws an exception, those requests will time out since the 
> [callback|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L183]
>  we use to respond to those requests never gets invoked.
> At a bare minimum, we should:
>  * Log any exceptions thrown from the {{taskConfigs}} method at {{ERROR}} 
> level
>  * Invoke any callbacks passed in to the relevant {{StandaloneHerder}} 
> methods with any exceptions thrown by the {{taskConfigs}} method
> We might also consider introducing the same kind of exponential backoff retry 
> logic used by distributed mode, but this can be addressed separately since it 
> would be a much larger change in behavior and may break existing user's 
> deployments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14569) Migrate EmbeddedKafkaCluster used by Connect integration tests from EmbeddedZookeeper to KRaft

2024-06-30 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14569:
--

Assignee: (was: Yash Mayya)

> Migrate EmbeddedKafkaCluster used by Connect integration tests from 
> EmbeddedZookeeper to KRaft
> --
>
> Key: KAFKA-14569
> URL: https://issues.apache.org/jira/browse/KAFKA-14569
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Yash Mayya
>Priority: Minor
>
> ZooKeeper mode is going to be deprecated in Apache Kafka 4.0. Connect 
> currently uses an 
> [EmbeddedKafkaCluster|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L95]
>  (that depends on an 
> [EmbeddedZookeeper)|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L147]
>  for integration tests with the 
> [EmbeddedConnectCluster|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java#L70].
>  This should be migrated to remove the ZooKeeper dependency, instead working 
> in the KRaft mode (probably with co-located brokers and controllers). We 
> could potentially leverage the [existing test kit for KRaft 
> clusters|https://github.com/apache/kafka/tree/b8ab09820cd96290176afd24cf7b03e7cda7f783/core/src/test/java/kafka/testkit]
>  which handles a bunch of stuff including the listeners configuration setup, 
> formatting the metadata log directory, allowing usage of non-static random 
> ports for `controller.quorum.voters`, initialization of the shared server(s), 
> broker(s), and controller(s) etc.
>  
> One more thing to note is that some Connect integration tests currently use 
> the `kafka.security.authorizer.AclAuthorizer` which requires ZooKeeper. These 
> tests should be migrated to use the new authorizer from 
> [KIP-801|https://cwiki.apache.org/confluence/display/KAFKA/KIP-801%3A+Implement+an+Authorizer+that+stores+metadata+in+__cluster_metadata]
>  if we want to completely eliminate the dependency on ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements

2024-06-30 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14353:
--

Assignee: (was: Yash Mayya)

> Kafka Connect REST API configuration validation timeout improvements
> 
>
> Key: KAFKA-14353
> URL: https://issues.apache.org/jira/browse/KAFKA-14353
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Yash Mayya
>Priority: Minor
>  Labels: kip-required
>
> Kafka Connect currently defines a default REST API request timeout of [90 
> seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30].
>  If a REST API request takes longer than this timeout value, a {{500 Internal 
> Server Error}}  response is returned with the message "Request timed out".
> The {{POST /connectors}}  and the {{PUT /connectors/\{connector}/config}}  
> endpoints that are used to create or update connectors internally do a 
> connector configuration validation (the details of which vary depending on 
> the connector plugin) before proceeding to write a message to the Connect 
> cluster's config topic. If the configuration validation takes longer than 90 
> seconds, the connector is still eventually created after the config 
> validation completes (even though a {{500 Internal Server Error}}  response 
> is returned to the user) which leads to a fairly confusing user experience.
> Furthermore, this situation is exacerbated by the potential for config 
> validations occurring twice for a single request. If Kafka Connect is running 
> in distributed mode, requests to create or update a connector are forwarded 
> to the Connect worker which is currently the leader of the group, if the 
> initial request is made to a worker which is not the leader. In this case, 
> the config validation occurs both on the initial worker, as well as the 
> leader (assuming that the first config validation is successful) - this means 
> that if a config validation takes longer than 45 seconds to complete each 
> time, it will result in the original create / update connector request timing 
> out.
> Slow config validations can occur in certain exceptional scenarios - consider 
> a database connector which has elaborate validation logic involving querying 
> information schema to get a list of tables and views to validate the user's 
> connector configuration. If the database has a very high number of tables and 
> views and the database is under a heavy load in terms of query volume, such 
> information schema queries can end up being considerably slow to complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-06-30 Thread zhengke zhou (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860996#comment-17860996
 ] 

zhengke zhou commented on KAFKA-16765:
--

[Greg 
Harris|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=gharris1727] 
In this ticket, we want to avoid the situation where the Producer: 
acceptorThread creates a channel and adds it to SocketChannels that will never 
be consumed by the main thread. To resolve this, we can clear up 
closeSocketChannels after acceptorThread exited.

> NioEchoServer leaks accepted SocketChannel instances due to race condition
> --
>
> Key: KAFKA-16765
> URL: https://issues.apache.org/jira/browse/KAFKA-16765
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Assignee: zhengke zhou
>Priority: Minor
>  Labels: newbie
>
> The NioEchoServer has an AcceptorThread that calls accept() to open new 
> SocketChannel instances and insert them into the `newChannels` List, and a 
> main thread that drains the `newChannels` List and moves them to the 
> `socketChannels` List.
> During shutdown, the serverSocketChannel is closed, which causes both threads 
> to exit their while loops. It is possible for the NioEchoServer main thread 
> to sense the serverSocketChannel close and terminate before the Acceptor 
> thread does, and for the Acceptor thread to put a SocketChannel in 
> `newChannels` before terminating. This instance is never closed by either 
> thread, because it is never moved to `socketChannels`.
> A precise execution order that has this leak is:
> 1. NioEchoServer thread locks `newChannels`.
> 2. Acceptor thread accept() completes, and the SocketChannel is created
> 3. Acceptor thread blocks waiting for the `newChannels` lock
> 4. NioEchoServer thread releases the `newChannels` lock and does some 
> processing
> 5. NioEchoServer#close() is called, which closes the serverSocketChannel
> 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then 
> terminates
> 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
> to `newChannels`.
> 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
> 9. NioEchoServer#close() stops blocking now that both other threads have 
> terminated.
> The end result is that the leaked socket is left open in the `newChannels` 
> list at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-06-30 Thread zhengke zhou (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16765 ]


zhengke zhou deleted comment on KAFKA-16765:
--

was (Author: JIRAUSER305940):
[~gharris1727]  We want to avoid the situation where the Producer: 
acceptorThread creates a channel and adds it to SocketChannels that will never 
be consumed by the main thread.
To resolve this, we can clear up closeSocketChannels after acceptorThread 
exited.

> NioEchoServer leaks accepted SocketChannel instances due to race condition
> --
>
> Key: KAFKA-16765
> URL: https://issues.apache.org/jira/browse/KAFKA-16765
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Assignee: zhengke zhou
>Priority: Minor
>  Labels: newbie
>
> The NioEchoServer has an AcceptorThread that calls accept() to open new 
> SocketChannel instances and insert them into the `newChannels` List, and a 
> main thread that drains the `newChannels` List and moves them to the 
> `socketChannels` List.
> During shutdown, the serverSocketChannel is closed, which causes both threads 
> to exit their while loops. It is possible for the NioEchoServer main thread 
> to sense the serverSocketChannel close and terminate before the Acceptor 
> thread does, and for the Acceptor thread to put a SocketChannel in 
> `newChannels` before terminating. This instance is never closed by either 
> thread, because it is never moved to `socketChannels`.
> A precise execution order that has this leak is:
> 1. NioEchoServer thread locks `newChannels`.
> 2. Acceptor thread accept() completes, and the SocketChannel is created
> 3. Acceptor thread blocks waiting for the `newChannels` lock
> 4. NioEchoServer thread releases the `newChannels` lock and does some 
> processing
> 5. NioEchoServer#close() is called, which closes the serverSocketChannel
> 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then 
> terminates
> 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
> to `newChannels`.
> 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
> 9. NioEchoServer#close() stops blocking now that both other threads have 
> terminated.
> The end result is that the leaked socket is left open in the `newChannels` 
> list at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-06-30 Thread zhengke zhou (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860952#comment-17860952
 ] 

zhengke zhou edited comment on KAFKA-16765 at 6/30/24 1:20 PM:
---

[~gharris1727]  We want to avoid the situation where the Producer: 
acceptorThread creates a channel and adds it to SocketChannels that will never 
be consumed by the main thread.
To resolve this, we can clear up closeSocketChannels after acceptorThread 
exited.


was (Author: JIRAUSER305940):
[~gharris1727]  
We want to avoid the situation where the Producer: acceptorThread creates a 
channel and adds it to SocketChannels that will never be consumed by the main 
thread.
To resolve this, we can clear up closeSocketChannels after acceptorThread 
exited.

> NioEchoServer leaks accepted SocketChannel instances due to race condition
> --
>
> Key: KAFKA-16765
> URL: https://issues.apache.org/jira/browse/KAFKA-16765
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Assignee: zhengke zhou
>Priority: Minor
>  Labels: newbie
>
> The NioEchoServer has an AcceptorThread that calls accept() to open new 
> SocketChannel instances and insert them into the `newChannels` List, and a 
> main thread that drains the `newChannels` List and moves them to the 
> `socketChannels` List.
> During shutdown, the serverSocketChannel is closed, which causes both threads 
> to exit their while loops. It is possible for the NioEchoServer main thread 
> to sense the serverSocketChannel close and terminate before the Acceptor 
> thread does, and for the Acceptor thread to put a SocketChannel in 
> `newChannels` before terminating. This instance is never closed by either 
> thread, because it is never moved to `socketChannels`.
> A precise execution order that has this leak is:
> 1. NioEchoServer thread locks `newChannels`.
> 2. Acceptor thread accept() completes, and the SocketChannel is created
> 3. Acceptor thread blocks waiting for the `newChannels` lock
> 4. NioEchoServer thread releases the `newChannels` lock and does some 
> processing
> 5. NioEchoServer#close() is called, which closes the serverSocketChannel
> 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then 
> terminates
> 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
> to `newChannels`.
> 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
> 9. NioEchoServer#close() stops blocking now that both other threads have 
> terminated.
> The end result is that the leaked socket is left open in the `newChannels` 
> list at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-06-30 Thread zhengke zhou (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860952#comment-17860952
 ] 

zhengke zhou edited comment on KAFKA-16765 at 6/30/24 1:20 PM:
---

[~gharris1727]  
We want to avoid the situation where the Producer: acceptorThread creates a 
channel and adds it to SocketChannels that will never be consumed by the main 
thread.
To resolve this, we can clear up closeSocketChannels after acceptorThread 
exited.


was (Author: JIRAUSER305940):
If we can make *main* thread always close after *AcceptorThread* that seems 
like can be fixed.

I will try to fix this.

> NioEchoServer leaks accepted SocketChannel instances due to race condition
> --
>
> Key: KAFKA-16765
> URL: https://issues.apache.org/jira/browse/KAFKA-16765
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Assignee: zhengke zhou
>Priority: Minor
>  Labels: newbie
>
> The NioEchoServer has an AcceptorThread that calls accept() to open new 
> SocketChannel instances and insert them into the `newChannels` List, and a 
> main thread that drains the `newChannels` List and moves them to the 
> `socketChannels` List.
> During shutdown, the serverSocketChannel is closed, which causes both threads 
> to exit their while loops. It is possible for the NioEchoServer main thread 
> to sense the serverSocketChannel close and terminate before the Acceptor 
> thread does, and for the Acceptor thread to put a SocketChannel in 
> `newChannels` before terminating. This instance is never closed by either 
> thread, because it is never moved to `socketChannels`.
> A precise execution order that has this leak is:
> 1. NioEchoServer thread locks `newChannels`.
> 2. Acceptor thread accept() completes, and the SocketChannel is created
> 3. Acceptor thread blocks waiting for the `newChannels` lock
> 4. NioEchoServer thread releases the `newChannels` lock and does some 
> processing
> 5. NioEchoServer#close() is called, which closes the serverSocketChannel
> 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then 
> terminates
> 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
> to `newChannels`.
> 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
> 9. NioEchoServer#close() stops blocking now that both other threads have 
> terminated.
> The end result is that the leaked socket is left open in the `newChannels` 
> list at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)