[jira] [Assigned] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14956: -- Assignee: (was: Yash Mayya) > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Sagar Rao >Priority: Major > Labels: flaky-test > Fix For: 3.5.0 > > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at > java.base@17.
[jira] [Assigned] (KAFKA-14844) Kafka Connect's OffsetBackingStore interface should handle (de)serialization and connector namespacing
[ https://issues.apache.org/jira/browse/KAFKA-14844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14844: -- Assignee: (was: Yash Mayya) > Kafka Connect's OffsetBackingStore interface should handle (de)serialization > and connector namespacing > -- > > Key: KAFKA-14844 > URL: https://issues.apache.org/jira/browse/KAFKA-14844 > Project: Kafka > Issue Type: Task > Components: connect >Reporter: Yash Mayya >Priority: Major > > Relevant discussion here - > [https://github.com/apache/kafka/pull/13434/files#r114972] > > TLDR - we should move serialization / deserialization and key construction > (connector namespacing) for source connector offsets from the > OffsetStorageWriter / OffsetStorageReader interfaces into the > OffsetBackingStore interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14858) Standalone herder does not handle exceptions thrown from connector taskConfigs method
[ https://issues.apache.org/jira/browse/KAFKA-14858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14858: -- Assignee: (was: Yash Mayya) > Standalone herder does not handle exceptions thrown from connector > taskConfigs method > - > > Key: KAFKA-14858 > URL: https://issues.apache.org/jira/browse/KAFKA-14858 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chris Egerton >Priority: Major > > In distributed mode, if a connector throws an exception from its > {{taskConfigs}} method (invoked by the herder, through the {{Worker}} class, > [here|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1960]), > we wait for an exponential backoff period (see KAFKA-14732) and then [retry > the > operation|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1907-L1911]. > However, in standalone mode, not only do we not retry the operation, we do > not even log the exception. In addition, when REST calls are made that > require generating new task configs for a connector (which include creating > and reconfiguring a connector), if the connector's {{taskConfigs}} method > throws an exception, those requests will time out since the > [callback|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L183] > we use to respond to those requests never gets invoked. > At a bare minimum, we should: > * Log any exceptions thrown from the {{taskConfigs}} method at {{ERROR}} > level > * Invoke any callbacks passed in to the relevant {{StandaloneHerder}} > methods with any exceptions thrown by the {{taskConfigs}} method > We might also consider introducing the same kind of exponential backoff retry > logic used by distributed mode, but this can be addressed separately since it > would be a much larger change in behavior and may break existing user's > deployments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14569) Migrate EmbeddedKafkaCluster used by Connect integration tests from EmbeddedZookeeper to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14569: -- Assignee: (was: Yash Mayya) > Migrate EmbeddedKafkaCluster used by Connect integration tests from > EmbeddedZookeeper to KRaft > -- > > Key: KAFKA-14569 > URL: https://issues.apache.org/jira/browse/KAFKA-14569 > Project: Kafka > Issue Type: Task > Components: connect >Reporter: Yash Mayya >Priority: Minor > > ZooKeeper mode is going to be deprecated in Apache Kafka 4.0. Connect > currently uses an > [EmbeddedKafkaCluster|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L95] > (that depends on an > [EmbeddedZookeeper)|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L147] > for integration tests with the > [EmbeddedConnectCluster|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java#L70]. > This should be migrated to remove the ZooKeeper dependency, instead working > in the KRaft mode (probably with co-located brokers and controllers). We > could potentially leverage the [existing test kit for KRaft > clusters|https://github.com/apache/kafka/tree/b8ab09820cd96290176afd24cf7b03e7cda7f783/core/src/test/java/kafka/testkit] > which handles a bunch of stuff including the listeners configuration setup, > formatting the metadata log directory, allowing usage of non-static random > ports for `controller.quorum.voters`, initialization of the shared server(s), > broker(s), and controller(s) etc. > > One more thing to note is that some Connect integration tests currently use > the `kafka.security.authorizer.AclAuthorizer` which requires ZooKeeper. These > tests should be migrated to use the new authorizer from > [KIP-801|https://cwiki.apache.org/confluence/display/KAFKA/KIP-801%3A+Implement+an+Authorizer+that+stores+metadata+in+__cluster_metadata] > if we want to completely eliminate the dependency on ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements
[ https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14353: -- Assignee: (was: Yash Mayya) > Kafka Connect REST API configuration validation timeout improvements > > > Key: KAFKA-14353 > URL: https://issues.apache.org/jira/browse/KAFKA-14353 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Yash Mayya >Priority: Minor > Labels: kip-required > > Kafka Connect currently defines a default REST API request timeout of [90 > seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]. > If a REST API request takes longer than this timeout value, a {{500 Internal > Server Error}} response is returned with the message "Request timed out". > The {{POST /connectors}} and the {{PUT /connectors/\{connector}/config}} > endpoints that are used to create or update connectors internally do a > connector configuration validation (the details of which vary depending on > the connector plugin) before proceeding to write a message to the Connect > cluster's config topic. If the configuration validation takes longer than 90 > seconds, the connector is still eventually created after the config > validation completes (even though a {{500 Internal Server Error}} response > is returned to the user) which leads to a fairly confusing user experience. > Furthermore, this situation is exacerbated by the potential for config > validations occurring twice for a single request. If Kafka Connect is running > in distributed mode, requests to create or update a connector are forwarded > to the Connect worker which is currently the leader of the group, if the > initial request is made to a worker which is not the leader. In this case, > the config validation occurs both on the initial worker, as well as the > leader (assuming that the first config validation is successful) - this means > that if a config validation takes longer than 45 seconds to complete each > time, it will result in the original create / update connector request timing > out. > Slow config validations can occur in certain exceptional scenarios - consider > a database connector which has elaborate validation logic involving querying > information schema to get a list of tables and views to validate the user's > connector configuration. If the database has a very high number of tables and > views and the database is under a heavy load in terms of query volume, such > information schema queries can end up being considerably slow to complete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully
[ https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-16196: --- Fix Version/s: 3.8.0 > Cast transform doesn't handle invalid whole value casts gracefully > -- > > Key: KAFKA-16196 > URL: https://issues.apache.org/jira/browse/KAFKA-16196 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.8.0 > > > The Cast transform currently doesn't handle invalid whole value casts > gracefully. A whole value cast is configured like \{"spec": "int8"} as > opposed to a field level cast like \{"spec": "field1:int8"}. > > If an invalid field level cast is specified (for instance - \{"spec": > "field1:invalid"}), this results in a {{ConfigException}} being thrown here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] > which is handled gracefully as a validation error here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] > > However, invalid whole value casts (for instance - \{"spec": "invalid"}) > aren't handled appropriately and result in an > {{IllegalArgumentException}} being thrown, which surfaces as an uncaught > exception and a {{500 Internal Server Error}} response from the connector > create / update / config validation REST API endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully
[ https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-16196: --- Description: The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like \{"spec": "int8"} as opposed to a field level cast like \{"spec": "field1:int8"}. If an invalid field level cast is specified (for instance - \{"spec": "field1:invalid"}), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts (for instance - \{"spec": "invalid"}) aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. was: The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like \{"spec": "int8"} as opposed to a field level cast like {{{}{"spec": "field1:int8"{. If an invalid field level cast is specified (for instance - {{{}{"spec": "field1:invalid"{), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. > Cast transform doesn't handle invalid whole value casts gracefully > -- > > Key: KAFKA-16196 > URL: https://issues.apache.org/jira/browse/KAFKA-16196 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The Cast transform currently doesn't handle invalid whole value casts > gracefully. A whole value cast is configured like \{"spec": "int8"} as > opposed to a field level cast like \{"spec": "field1:int8"}. > > If an invalid field level cast is specified (for instance - \{"spec": > "field1:invalid"}), this results in a {{ConfigException}} being thrown here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] > which is handled gracefully as a validation error here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] > > However, invalid whole value casts (for instance - \{"spec": "invalid"}) > aren't handled appropriately and result in an > {{IllegalArgumentException}} being thrown, which surfaces as an uncaught > exception and a {{500 Internal Server Error}} response from the connector > create / update / config validation REST API endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully
[ https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-16196: --- Description: The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like \{"spec": "int8"} as opposed to a field level cast like {{{}{"spec": "field1:int8"{. If an invalid field level cast is specified (for instance - {{{}{"spec": "field1:invalid"{), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. was: The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as opposed to a field level cast like {{{}{"spec": "field1:int8"{. If an invalid field level cast is specified (for instance - {{{}{"spec": "field1:invalid"{), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. > Cast transform doesn't handle invalid whole value casts gracefully > -- > > Key: KAFKA-16196 > URL: https://issues.apache.org/jira/browse/KAFKA-16196 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The Cast transform currently doesn't handle invalid whole value casts > gracefully. A whole value cast is configured like \{"spec": "int8"} as > opposed to a field level cast like {{{}{"spec": "field1:int8"{. > > If an invalid field level cast is specified (for instance - {{{}{"spec": > "field1:invalid"{), this results in a {{ConfigException}} being thrown > here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] > which is handled gracefully as a validation error here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] > > However, invalid whole value casts (for instance - {{{}{"spec": > "invalid"{) aren't handled appropriately and result in an > {{IllegalArgumentException}} being thrown, which surfaces as an uncaught > exception and a {{500 Internal Server Error}} response from the connector > create / update / config validation REST API endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully
[ https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-16196: --- Description: The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as opposed to a field level cast like {{{}{"spec": "field1:int8"{. If an invalid field level cast is specified (for instance - {{{}{"spec": "field1:invalid"{), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. was: The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as opposed to a field level cast like {{{}{"spec": "field1:int8"{. If an invalid field level cast is specified (for instance - {{{}{"spec": "field1:invalid"{), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. > Cast transform doesn't handle invalid whole value casts gracefully > -- > > Key: KAFKA-16196 > URL: https://issues.apache.org/jira/browse/KAFKA-16196 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The Cast transform currently doesn't handle invalid whole value casts > gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as > opposed to a field level cast like {{{}{"spec": "field1:int8"{. > > If an invalid field level cast is specified (for instance - {{{}{"spec": > "field1:invalid"{), this results in a {{ConfigException}} being thrown > here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] > which is handled gracefully as a validation error here - > [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] > > However, invalid whole value casts (for instance - {{{}{"spec": > "invalid"{) aren't handled appropriately and result in an > {{IllegalArgumentException}} being thrown, which surfaces as an uncaught > exception and a {{500 Internal Server Error}} response from the connector > create / update / config validation REST API endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully
Yash Mayya created KAFKA-16196: -- Summary: Cast transform doesn't handle invalid whole value casts gracefully Key: KAFKA-16196 URL: https://issues.apache.org/jira/browse/KAFKA-16196 Project: Kafka Issue Type: Bug Components: connect Reporter: Yash Mayya Assignee: Yash Mayya The Cast transform currently doesn't handle invalid whole value casts gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as opposed to a field level cast like {{{}{"spec": "field1:int8"{. If an invalid field level cast is specified (for instance - {{{}{"spec": "field1:invalid"{), this results in a {{ConfigException}} being thrown here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416] which is handled gracefully as a validation error here - [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609] However, invalid whole value casts aren't handled appropriately and result in an {{IllegalArgumentException}} being thrown, which surfaces as an uncaught exception and a {{500 Internal Server Error}} response from the connector create / update / config validation REST API endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15888) DistributedHerder log context should not use the same client ID for each Connect worker by default
Yash Mayya created KAFKA-15888: -- Summary: DistributedHerder log context should not use the same client ID for each Connect worker by default Key: KAFKA-15888 URL: https://issues.apache.org/jira/browse/KAFKA-15888 Project: Kafka Issue Type: Bug Components: connect, KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya By default, if there is no "{{{}client.id"{}}} configured on a Connect worker running in distributed mode, the same client ID ("connect-1") will be used in the log context for the DistributedHerder class in every single worker in the Connect cluster. This default is quite confusing and obviously not very useful. Further, based on how this default is configured ([ref|https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299]), it seems like this might have been an unintentional bug. We could simply use the workerId (the advertised host name and port of the worker) by default instead, which should be unique for each worker in a cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15570) Add unit tests for MemoryConfigBackingStore
Yash Mayya created KAFKA-15570: -- Summary: Add unit tests for MemoryConfigBackingStore Key: KAFKA-15570 URL: https://issues.apache.org/jira/browse/KAFKA-15570 Project: Kafka Issue Type: Test Components: connect, KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya Currently, the [MemoryConfigBackingStore|https://github.com/apache/kafka/blob/6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java#L37] class doesn't have any unit tests for its functionality. While most of its functionality is fairly lightweight today, changes will be introduced with [KIP-980|https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state] (potentially [KIP-976|https://cwiki.apache.org/confluence/display/KAFKA/KIP-976%3A+Cluster-wide+dynamic+log+adjustment+for+Kafka+Connect] as well) and it would be good to have a test setup in place before those changes are made. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15547) Thread leak in MirrorMakerConfigTest#testClientConfigProperties
[ https://issues.apache.org/jira/browse/KAFKA-15547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15547: --- Issue Type: Test (was: Bug) > Thread leak in MirrorMakerConfigTest#testClientConfigProperties > --- > > Key: KAFKA-15547 > URL: https://issues.apache.org/jira/browse/KAFKA-15547 > Project: Kafka > Issue Type: Test >Reporter: Kalpesh Patel >Assignee: Kalpesh Patel >Priority: Minor > Fix For: 3.7.0 > > > The test MirrorMakerConfigTest#testClientConfigProperties opens a > ForwardingAdmin but fails to close it. > we should enclose this in a try-with-resources statement to ensure the Admin > client is closed and there is no thread leak -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15547) Thread leak in MirrorMakerConfigTest#testClientConfigProperties
[ https://issues.apache.org/jira/browse/KAFKA-15547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya resolved KAFKA-15547. Fix Version/s: 3.7.0 Resolution: Fixed > Thread leak in MirrorMakerConfigTest#testClientConfigProperties > --- > > Key: KAFKA-15547 > URL: https://issues.apache.org/jira/browse/KAFKA-15547 > Project: Kafka > Issue Type: Bug >Reporter: Kalpesh Patel >Assignee: Kalpesh Patel >Priority: Minor > Fix For: 3.7.0 > > > The test MirrorMakerConfigTest#testClientConfigProperties opens a > ForwardingAdmin but fails to close it. > we should enclose this in a try-with-resources statement to ensure the Admin > client is closed and there is no thread leak -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API
[ https://issues.apache.org/jira/browse/KAFKA-15177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya resolved KAFKA-15177. Fix Version/s: 3.6.0 Resolution: Fixed > MirrorMaker 2 should implement the alterOffsets KIP-875 API > --- > > Key: KAFKA-15177 > URL: https://issues.apache.org/jira/browse/KAFKA-15177 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect, mirrormaker >Reporter: Yash Mayya >Assignee: Chris Egerton >Priority: Minor > Fix For: 3.6.0 > > > The {{MirrorSourceConnector}} class should implement the new alterOffsets API > added in > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]. > We could also implement the API in > {{MirrorCheckpointConnector}} and > {{MirrorHeartbeatConnector}} to prevent external modification of offsets > since the operation wouldn't really make sense in their case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15248) Add BooleanConverter to Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15248: --- Fix Version/s: 3.7.0 > Add BooleanConverter to Kafka Connect > - > > Key: KAFKA-15248 > URL: https://issues.apache.org/jira/browse/KAFKA-15248 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.7.0 > > > KIP-959: Add BooleanConverter to Kafka Connect -> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-959%3A+Add+BooleanConverter+to+Kafka+Connect -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15470) Allow creating connectors in a stopped state
[ https://issues.apache.org/jira/browse/KAFKA-15470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766107#comment-17766107 ] Yash Mayya commented on KAFKA-15470: https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state > Allow creating connectors in a stopped state > > > Key: KAFKA-15470 > URL: https://issues.apache.org/jira/browse/KAFKA-15470 > Project: Kafka > Issue Type: New Feature > Components: connect, KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Labels: connect, kafka-connect, kip-required > Fix For: 3.7.0 > > > [KIP-875: First-class offsets support in Kafka > Connect|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > introduced a new {{STOPPED}} state for connectors along with some REST API > endpoints to retrieve and modify offsets for connectors. Currently, only > connectors that already exist can be stopped and any newly created connector > will always be in the {{RUNNING}} state initially. Allowing the creation of > connectors in a {{STOPPED}} state will facilitate multiple new use cases. One > interesting use case would be to migrate connectors from one Kafka Connect > cluster to another. Individual connector migration would be useful in a > number of scenarios such as breaking a large cluster into multiple smaller > clusters (or vice versa), moving a connector from a cluster running in one > data center to another etc. A connector migration could be achieved by using > the following sequence of steps :- > # Stop the running connector on the original Kafka Connect cluster > # Retrieve the offsets for the connector via the {{GET > /connectors/\{connector}/offsets}} endpoint > # Create the connector in a stopped state using the same configuration on > the new Kafka Connect cluster > # Alter the offsets for the connector on the new cluster via the {{PATCH > /connectors/\{connector}/offsets}} endpoint (using the offsets obtained from > the original cluster) > # Resume the connector on the new cluster and delete it on the original > cluster > Another use case for creating connectors in a stopped state could be > deploying connectors as a part of a larger data pipeline before the source / > sink data system has been created or is ready for data transfer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15470) Allow creating connectors in a stopped state
Yash Mayya created KAFKA-15470: -- Summary: Allow creating connectors in a stopped state Key: KAFKA-15470 URL: https://issues.apache.org/jira/browse/KAFKA-15470 Project: Kafka Issue Type: New Feature Components: connect, KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya Fix For: 3.7.0 [KIP-875: First-class offsets support in Kafka Connect|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] introduced a new {{STOPPED}} state for connectors along with some REST API endpoints to retrieve and modify offsets for connectors. Currently, only connectors that already exist can be stopped and any newly created connector will always be in the {{RUNNING}} state initially. Allowing the creation of connectors in a {{STOPPED}} state will facilitate multiple new use cases. One interesting use case would be to migrate connectors from one Kafka Connect cluster to another. Individual connector migration would be useful in a number of scenarios such as breaking a large cluster into multiple smaller clusters (or vice versa), moving a connector from a cluster running in one data center to another etc. A connector migration could be achieved by using the following sequence of steps :- # Stop the running connector on the original Kafka Connect cluster # Retrieve the offsets for the connector via the {{GET /connectors/\{connector}/offsets}} endpoint # Create the connector in a stopped state using the same configuration on the new Kafka Connect cluster # Alter the offsets for the connector on the new cluster via the {{PATCH /connectors/\{connector}/offsets}} endpoint (using the offsets obtained from the original cluster) # Resume the connector on the new cluster and delete it on the original cluster Another use case for creating connectors in a stopped state could be deploying connectors as a part of a larger data pipeline before the source / sink data system has been created or is ready for data transfer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id
[ https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya resolved KAFKA-14067. Resolution: Fixed > Sink connector override.consumer.group.id can conflict with worker group.id > --- > > Key: KAFKA-14067 > URL: https://issues.apache.org/jira/browse/KAFKA-14067 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Greg Harris >Priority: Minor > Fix For: 3.7.0 > > > Currently there is a validation step for connector names which prevents sink > connector consumer groups from colliding with the worker group.id. > There is currently no such validation for consumer.override.group.id that > would prevent a conflicting connector from being configured, and so it is > possible to misconfigure a connector in a way that may be damaging to the > workers themselves. > Reproduction steps: > 1. Configure a connect distributed cluster with a certain group.id in the > worker config. > 2. Configure a sink connector with consumer.override.group.id having the same > value as in the worker config > Expected behavior: > 1. An error is returned indicating that the consumer.override.group.id is > invalid > 2. The connector is not created or started > Actual behavior: > 1. No error is returned, and the configuration is otherwise valid. > 2. The connector is created and starts running. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id
[ https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14067: --- Fix Version/s: 3.7.0 > Sink connector override.consumer.group.id can conflict with worker group.id > --- > > Key: KAFKA-14067 > URL: https://issues.apache.org/jira/browse/KAFKA-14067 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Greg Harris >Priority: Minor > Fix For: 3.7.0 > > > Currently there is a validation step for connector names which prevents sink > connector consumer groups from colliding with the worker group.id. > There is currently no such validation for consumer.override.group.id that > would prevent a conflicting connector from being configured, and so it is > possible to misconfigure a connector in a way that may be damaging to the > workers themselves. > Reproduction steps: > 1. Configure a connect distributed cluster with a certain group.id in the > worker config. > 2. Configure a sink connector with consumer.override.group.id having the same > value as in the worker config > Expected behavior: > 1. An error is returned indicating that the consumer.override.group.id is > invalid > 2. The connector is not created or started > Actual behavior: > 1. No error is returned, and the configuration is otherwise valid. > 2. The connector is created and starts running. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id
[ https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764114#comment-17764114 ] Yash Mayya commented on KAFKA-14067: This has been fixed in [https://github.com/apache/kafka/pull/14303] > Sink connector override.consumer.group.id can conflict with worker group.id > --- > > Key: KAFKA-14067 > URL: https://issues.apache.org/jira/browse/KAFKA-14067 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Greg Harris >Priority: Minor > > Currently there is a validation step for connector names which prevents sink > connector consumer groups from colliding with the worker group.id. > There is currently no such validation for consumer.override.group.id that > would prevent a conflicting connector from being configured, and so it is > possible to misconfigure a connector in a way that may be damaging to the > workers themselves. > Reproduction steps: > 1. Configure a connect distributed cluster with a certain group.id in the > worker config. > 2. Configure a sink connector with consumer.override.group.id having the same > value as in the worker config > Expected behavior: > 1. An error is returned indicating that the consumer.override.group.id is > invalid > 2. The connector is not created or started > Actual behavior: > 1. No error is returned, and the configuration is otherwise valid. > 2. The connector is created and starts running. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14855) Harden integration testing logic for asserting that a connector is deleted
[ https://issues.apache.org/jira/browse/KAFKA-14855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14855: -- Assignee: Yash Mayya > Harden integration testing logic for asserting that a connector is deleted > -- > > Key: KAFKA-14855 > URL: https://issues.apache.org/jira/browse/KAFKA-14855 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Minor > > In the Connect embedded integration testing framework, the > [EmbeddedConnectClusterAssertions::assertConnectorAndTasksAreStopped > method|https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L411-L428] > is used in several places to verify that a connector has been deleted. (This > method may be renamed in an upcoming PR to something like > {{{}assertConnectorAndTasksAreNotRunning{}}}, but apart from that, its usage > and semantics will remain unchanged.) However, the [underlying logic for that > assertion|https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L430-L451] > doesn't strictly check for deletion (which can be done by verifying that the > connector and its tasks no longer appear in the REST API at all), since it > also allows for the Connector or tasks to appear in the REST API, but with a > state that is not {{{}RUNNING{}}}. > This constraint is a bit too lax and may be silently masking issues with our > shutdown logic for to-be-deleted connectors. We should try to narrow the > criteria for that method so that it fails if the Connector or any of its > tasks still appear in the REST API, even with a non-{{{}RUNNING{}}} state. > However, we should also be careful to ensure that current uses of that method > are not relying on its semantics. If, for some reason, a test case requires > the existing semantics, we should evaluate whether it's necessary to continue > to rely on those semantics, and if so, probably preserve the existing method > so that it can be used wherever applicable (but rewrite all other tests to > use the new, stricter method). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14725) Improve cancellation semantics for connector tasks
[ https://issues.apache.org/jira/browse/KAFKA-14725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760929#comment-17760929 ] Yash Mayya commented on KAFKA-14725: [~ChrisEgerton] please feel free to take over this one, thanks. > Improve cancellation semantics for connector tasks > -- > > Key: KAFKA-14725 > URL: https://issues.apache.org/jira/browse/KAFKA-14725 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > > This came about during discussion on > [https://github.com/apache/kafka/pull/13208/,] which addressed KAFKA-5756. > > Right now, we make some effort to disable and shut down tasks that have been > scheduled for shutdown but taken longer than the [graceful shutdown timeout > period|https://kafka.apache.org/documentation.html#connectconfigs_task.shutdown.graceful.timeout.ms]. > The logic for performing this disablement is contained in the {{cancel}} > method for the > [WorkerTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L129-L136] > and its subclasses (at the time of writing, that would be the > [AbstractWorkerSourceTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java], > > [WorkerSourceTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java], > > [ExactlyOnceWorkerSourceTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java], > and > [WorkerSinkTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java] > classes). Right now we don't do much to interrupt in-progress operations, > which may lead to zombie tasks lying around on a worker that have not yet > relinquished resources like Kafka clients, file descriptors, or database > connections despite being scheduled for shutdown. > We can and should make the cancellation logic for tasks more stringent, > including but not limited to: > * Interrupting the work thread for the task > * Interrupting any in-progress offset commits > * Preemptively shutting down any Kafka clients created for use by the task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15387) Deprecate and remove Connect's redundant task configurations retrieval endpoint
[ https://issues.apache.org/jira/browse/KAFKA-15387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15387: --- Summary: Deprecate and remove Connect's redundant task configurations retrieval endpoint (was: Deprecate and remove Connect's duplicate task configurations retrieval endpoint) > Deprecate and remove Connect's redundant task configurations retrieval > endpoint > --- > > Key: KAFKA-15387 > URL: https://issues.apache.org/jira/browse/KAFKA-15387 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > Labels: Connect, kafka-connect, kip-required > Fix For: 4.0.0 > > > A new endpoint ({{{}GET /connectors/\{connector}/tasks-config){}}} was added > to Kafka Connect's REST API to expose task configurations in > [KIP-661|https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]. > However, the original patch for Kafka Connect's REST API had already added > an endpoint ({{{}GET /connectors/\{connector}/tasks){}}} to retrieve the list > of a connector's tasks and their configurations (ref - > [https://github.com/apache/kafka/pull/378] , > https://issues.apache.org/jira/browse/KAFKA-2369) and this was missed in > KIP-661. We can deprecate the endpoint added by KIP-661 in 3.7 (the next > minor AK release) and remove it in 4.0 (the next major AK release) since it's > redundant to have two separate endpoints to expose task configurations. > Related discussions in > [https://github.com/apache/kafka/pull/13424#discussion_r1144727886] and > https://issues.apache.org/jira/browse/KAFKA-15377 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15387) Deprecate and remove Connect's duplicate task configurations retrieval endpoint
Yash Mayya created KAFKA-15387: -- Summary: Deprecate and remove Connect's duplicate task configurations retrieval endpoint Key: KAFKA-15387 URL: https://issues.apache.org/jira/browse/KAFKA-15387 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya Fix For: 4.0.0 A new endpoint ({{{}GET /connectors/\{connector}/tasks-config){}}} was added to Kafka Connect's REST API to expose task configurations in [KIP-661|https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]. However, the original patch for Kafka Connect's REST API had already added an endpoint ({{{}GET /connectors/\{connector}/tasks){}}} to retrieve the list of a connector's tasks and their configurations (ref - [https://github.com/apache/kafka/pull/378] , https://issues.apache.org/jira/browse/KAFKA-2369) and this was missed in KIP-661. We can deprecate the endpoint added by KIP-661 in 3.7 (the next minor AK release) and remove it in 4.0 (the next major AK release) since it's redundant to have two separate endpoints to expose task configurations. Related discussions in [https://github.com/apache/kafka/pull/13424#discussion_r1144727886] and https://issues.apache.org/jira/browse/KAFKA-15377 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756798#comment-17756798 ] Yash Mayya edited comment on KAFKA-15377 at 8/21/23 11:00 AM: -- Although I'd like to note that since the endpoint won't be going away at least until 4.0 (assuming the KIP is accepted and implemented), we should probably still fix the security issue described in this ticket - [https://github.com/apache/kafka/pull/14244|https://github.com/apache/kafka/pull/14244.] was (Author: yash.mayya): Although I'd like to note that since the endpoint won't be going away at least until 4.0 (assuming the KIP is accepted and implemented), we should probably still fix the security issue described in this ticket - [https://github.com/apache/kafka/pull/14244.] > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756798#comment-17756798 ] Yash Mayya edited comment on KAFKA-15377 at 8/21/23 11:00 AM: -- Although I'd like to note that since the endpoint won't be going away at least until 4.0 (assuming the KIP is accepted and implemented), we should probably still fix the security issue described in this ticket - [https://github.com/apache/kafka/pull/14244|https://github.com/apache/kafka/pull/14244]. was (Author: yash.mayya): Although I'd like to note that since the endpoint won't be going away at least until 4.0 (assuming the KIP is accepted and implemented), we should probably still fix the security issue described in this ticket - [https://github.com/apache/kafka/pull/14244|https://github.com/apache/kafka/pull/14244.] > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756798#comment-17756798 ] Yash Mayya commented on KAFKA-15377: Although I'd like to note that since the endpoint won't be going away at least until 4.0 (assuming the KIP is accepted and implemented), we should probably still fix the security issue described in this ticket - [https://github.com/apache/kafka/pull/14244.] > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756797#comment-17756797 ] Yash Mayya commented on KAFKA-15377: Makes sense, thanks Mickael - I'll publish a small KIP soon. > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756787#comment-17756787 ] Yash Mayya commented on KAFKA-15377: Yeah, we'd discussed this previously here - [https://github.com/apache/kafka/pull/13424#discussion_r1144727886.] I'd be in favor of removing it completely; would we be able to do so with a single KIP (i.e. skipping deprecation followed by removal) targeting the next major release (4.0)? > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755826#comment-17755826 ] Yash Mayya edited comment on KAFKA-15377 at 8/18/23 7:13 AM: - [~mimaison] [~ChrisEgerton] even though this will technically change the response for a public REST API, I'm not sure it requires a KIP since it should be classified as a bug. What do you folks think? was (Author: yash.mayya): [~mimaison] [~ChrisEgerton] even though this does technically change the response for a public REST API, I'm not sure it requires a KIP since it should be classified as a bug. What do you folks think? > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755826#comment-17755826 ] Yash Mayya commented on KAFKA-15377: [~mimaison] [~ChrisEgerton] even though this does technically change the response for a public REST API, I'm not sure it requires a KIP since it should be classified as a bug. What do you folks think? > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
[ https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15377: --- Description: The {{GET /connectors/\{connector}/tasks-config}} endpoint added in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] exposes externalized secret values in task configurations (see [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / [https://github.com/apache/kafka/pull/6129] for the {{GET /connectors/\{connector}/tasks}} endpoint. The config provider placeholder should be used instead of the resolved config value. (was: The \{{GET /connectors/{connector}/tasks-config}} endpoint added in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] exposes externalized secret values in task configurations (see [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / [https://github.com/apache/kafka/pull/6129] for the \{{GET /connectors/{connector}/tasks}} endpoint. The config provider placeholder should be used instead of the resolved config value.) > GET /connectors/{connector}/tasks-config endpoint exposes externalized secret > values > > > Key: KAFKA-15377 > URL: https://issues.apache.org/jira/browse/KAFKA-15377 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > The {{GET /connectors/\{connector}/tasks-config}} endpoint added in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] > exposes externalized secret values in task configurations (see > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. > A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 > / [https://github.com/apache/kafka/pull/6129] for the {{GET > /connectors/\{connector}/tasks}} endpoint. The config provider placeholder > should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values
Yash Mayya created KAFKA-15377: -- Summary: GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values Key: KAFKA-15377 URL: https://issues.apache.org/jira/browse/KAFKA-15377 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya The \{{GET /connectors/{connector}/tasks-config}} endpoint added in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API] exposes externalized secret values in task configurations (see [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)]. A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / [https://github.com/apache/kafka/pull/6129] for the \{{GET /connectors/{connector}/tasks}} endpoint. The config provider placeholder should be used instead of the resolved config value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson) #
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson) #
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson) #
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14133: --- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # AbstractStreamTest # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KTableImplTest{color} (owner: Christo) # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo) # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo) # RecordCollectorTest (owner: Christo) # StateRestoreCallbackAdapterTest (owner: Christo) # StoreToProcessorContextAdapterTest (owner: Christo) # StreamsProducerTest (owner: Nelson) #
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14132: --- Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#00875a}KafkaBasedLogTest{color} (owner: @bachmanity ]) # {color:#00875a}RetryUtilTest{color} (owner: [~yash.mayya]) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: @bachmanity ]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) >
[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case
[ https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746865#comment-17746865 ] Yash Mayya commented on KAFKA-10457: If we want to add a new Connect schema type to accommodate big numbers, it'll probably require a small KIP since the types are a part of the public API. This ticket probably hasn't received more attention because a workaround exists with the [Decimal|https://github.com/apache/kafka/blob/c7de30f38bfd6e2d62a0b5c09b5dc9707e58096b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L40] class when schemas are enabled. > JsonConverter.toConnectData trims BigInteger to Long for schema-less case > - > > Key: KAFKA-10457 > URL: https://issues.apache.org/jira/browse/KAFKA-10457 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > > > When _JsonConverter_ is configured with _schemas.enable=false_ and value, > exceeding _Double_ is passed, the result is incorrect since the converter > trims it to _Double:_ > {code:java} > Map props = Collections.singletonMap("schemas.enable", > false); > converter.configure(props, true); > BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new > BigInteger("1")); > String msg = value.toString(); > SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, > msg.getBytes()); > assertNull(schemaAndValue.schema()); > assertEquals(value, schemaAndValue.value()); > {code} > > Fails with: > > {code:java} > expected:<9223372036854775808> but was:<-9223372036854775808> > Expected :9223372036854775808 > Actual :-9223372036854775808 > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-15238) Connect workers can be disabled by DLQ-related blocking admin client calls
[ https://issues.apache.org/jira/browse/KAFKA-15238 ] Yash Mayya deleted comment on KAFKA-15238: was (Author: yash.mayya): https://github.com/apache/kafka/pull/14079 > Connect workers can be disabled by DLQ-related blocking admin client calls > -- > > Key: KAFKA-15238 > URL: https://issues.apache.org/jira/browse/KAFKA-15238 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > When Kafka Connect is run in distributed mode - if a sink connector's task is > restarted (via a worker's REST API), the following sequence of steps will > occur (on the DistributedHerder's thread): > > # The existing sink task will be stopped > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367]) > # A new sink task will be started > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40]) > # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663]) > # The DLQ reporter (see > [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]) > for the sink task is also instantiated and configured as a part of this > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800]) > # The DLQ reporter setup involves two synchronous admin client calls to list > topics and create the DLQ topic if it isn't already created > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87]) > > All of these are occurring synchronously on the herder's tick thread - in > this portion > [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469] > where external requests are run. If the admin client call in the DLQ > reporter setup step blocks for some time (due to auth failures and retries or > network issues or whatever other reason), this can cause the Connect worker > to become non-functional (REST API requests will timeout) and even fall out > of the Connect cluster and become a zombie (since the tick thread also drives > group membership functions - see > [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L403], > > [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L535]). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15238) Connect workers can be disabled by DLQ-related blocking admin client calls
[ https://issues.apache.org/jira/browse/KAFKA-15238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15238: --- Description: When Kafka Connect is run in distributed mode - if a sink connector's task is restarted (via a worker's REST API), the following sequence of steps will occur (on the DistributedHerder's thread): # The existing sink task will be stopped ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367]) # A new sink task will be started ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40]) # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663]) # The DLQ reporter (see [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]) for the sink task is also instantiated and configured as a part of this ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800]) # The DLQ reporter setup involves two synchronous admin client calls to list topics and create the DLQ topic if it isn't already created ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87]) All of these are occurring synchronously on the herder's tick thread - in this portion [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469] where external requests are run. If the admin client call in the DLQ reporter setup step blocks for some time (due to auth failures and retries or network issues or whatever other reason), this can cause the Connect worker to become non-functional (REST API requests will timeout) and even fall out of the Connect cluster and become a zombie (since the tick thread also drives group membership functions - see [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L403], [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L535]). was: When Kafka Connect is run in distributed mode - if a sink connector's task is restarted (via a worker's REST API), the following sequence of steps will occur (on the DistributedHerder's thread): # The existing sink task will be stopped ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367]) # A new sink task will be started ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40]) # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663]) # The DLQ reporter (see [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]) for the sink task is also instantiated and configured as a part of this ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800]) # The DLQ reporter setup involves two synchronous admin client calls to list topics and create the DLQ topic if it isn't already created ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87]) All of these are occurring synchronously on the herder's tick thread - in this portion [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469] where external requests are run. If the admin client call in the DLQ reporter setup step blocks for some time (due to auth failures and retries or network issues or wha
[jira] [Updated] (KAFKA-15238) Connect workers can be disabled by DLQ-related blocking admin client calls
[ https://issues.apache.org/jira/browse/KAFKA-15238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15238: --- Summary: Connect workers can be disabled by DLQ-related blocking admin client calls (was: Connect workers can be disabled by DLQ related stuck admin client calls) > Connect workers can be disabled by DLQ-related blocking admin client calls > -- > > Key: KAFKA-15238 > URL: https://issues.apache.org/jira/browse/KAFKA-15238 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > When Kafka Connect is run in distributed mode - if a sink connector's task is > restarted (via a worker's REST API), the following sequence of steps will > occur (on the DistributedHerder's thread): > > # The existing sink task will be stopped > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367]) > # A new sink task will be started > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40]) > # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663]) > # The DLQ reporter (see > [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]) > for the sink task is also instantiated and configured as a part of this > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800]) > # The DLQ reporter setup involves two synchronous admin client calls to list > topics and create the DLQ topic if it isn't already created > ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87]) > > All of these are occurring synchronously on the herder's tick thread - in > this portion > [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469] > where external requests are run. If the admin client call in the DLQ > reporter setup step blocks for some time (due to auth failures and retries or > network issues or whatever other reason), this can cause the Connect worker > to become non-functional (REST API requests will timeout) and even fall out > of the Connect cluster and become a zombie (since the tick thread also drives > group membership functions). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15238) Connect workers can be disabled by DLQ related stuck admin client calls
Yash Mayya created KAFKA-15238: -- Summary: Connect workers can be disabled by DLQ related stuck admin client calls Key: KAFKA-15238 URL: https://issues.apache.org/jira/browse/KAFKA-15238 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya When Kafka Connect is run in distributed mode - if a sink connector's task is restarted (via a worker's REST API), the following sequence of steps will occur (on the DistributedHerder's thread): # The existing sink task will be stopped ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367]) # A new sink task will be started ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40]) # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663]) # The DLQ reporter (see [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]) for the sink task is also instantiated and configured as a part of this ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800]) # The DLQ reporter setup involves two synchronous admin client calls to list topics and create the DLQ topic if it isn't already created ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87]) All of these are occurring synchronously on the herder's tick thread - in this portion [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469] where external requests are run. If the admin client call in the DLQ reporter setup step blocks for some time (due to auth failures and retries or network issues or whatever other reason), this can cause the Connect worker to become non-functional (REST API requests will timeout) and even fall out of the Connect cluster and become a zombie (since the tick thread also drives group membership functions). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument
Yash Mayya created KAFKA-15216: -- Summary: InternalSinkRecord::newRecord method ignores the headers argument Key: KAFKA-15216 URL: https://issues.apache.org/jira/browse/KAFKA-15216 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56] - the headers argument passed to the {{InternalSinkRecord}} constructor is the instance field via the accessor {{headers()}} method instead of the {{newRecord}} method's {{headers}} argument value. Originally discovered [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets
[ https://issues.apache.org/jira/browse/KAFKA-15182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15182: --- Description: See discussion [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148] TLDR: When users attempt to externally modify source connector offsets via the {{PATCH /offsets}} endpoint (introduced in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]), type mismatches can occur between offsets passed to {{SourceConnector::alterOffsets}} and the offsets that are retrieved by connectors / tasks via an instance of {{OffsetStorageReader}} after the offsets have been modified. In order to prevent this type mismatch that could lead to subtle bugs in connectors, we could serialize + deserialize the offsets using the worker's internal JSON converter before invoking {{{}SourceConnector::alterOffsets{}}}. was: See discussion [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148] TLDR: When users attempt to externally modify source connector offsets via the {{PATCH /offsets}} endpoint (introduced in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]), type mismatches can occur between offsets passed to {{SourceConnector::alterOffsets}} and the offsets that are retrieved by connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets have been modified. In order to prevent this type mismatch that could lead to subtle bugs in connectors, we could serialize + deserialize the offsets using the worker's internal JSON converter before invoking {{{}SourceConnector::alterOffsets{}}}. > Normalize offsets before invoking SourceConnector::alterOffsets > --- > > Key: KAFKA-15182 > URL: https://issues.apache.org/jira/browse/KAFKA-15182 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > > See discussion > [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148] > > TLDR: When users attempt to externally modify source connector offsets via > the {{PATCH /offsets}} endpoint (introduced in > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]), > type mismatches can occur between offsets passed to > {{SourceConnector::alterOffsets}} and the offsets that are retrieved by > connectors / tasks via an instance of {{OffsetStorageReader}} after the > offsets have been modified. In order to prevent this type mismatch that could > lead to subtle bugs in connectors, we could serialize + deserialize the > offsets using the worker's internal JSON converter before invoking > {{{}SourceConnector::alterOffsets{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets
Yash Mayya created KAFKA-15182: -- Summary: Normalize offsets before invoking SourceConnector::alterOffsets Key: KAFKA-15182 URL: https://issues.apache.org/jira/browse/KAFKA-15182 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya See discussion [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148] TLDR: When users attempt to externally modify source connector offsets via the {{PATCH /offsets}} endpoint (introduced in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]), type mismatches can occur between offsets passed to {{SourceConnector::alterOffsets}} and the offsets that are retrieved by connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets have been modified. In order to prevent this type mismatch that could lead to subtle bugs in connectors, we could serialize + deserialize the offsets using the worker's internal JSON converter before invoking {{{}SourceConnector::alterOffsets{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors
Yash Mayya created KAFKA-15179: -- Summary: Add integration tests for the FileStream Sink and Source connectors Key: KAFKA-15179 URL: https://issues.apache.org/jira/browse/KAFKA-15179 Project: Kafka Issue Type: Improvement Reporter: Yash Mayya Assignee: Yash Mayya Add integration tests for the FileStream Sink and Source connectors covering various different common scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API
Yash Mayya created KAFKA-15177: -- Summary: MirrorMaker 2 should implement the alterOffsets KIP-875 API Key: KAFKA-15177 URL: https://issues.apache.org/jira/browse/KAFKA-15177 Project: Kafka Issue Type: Improvement Components: KafkaConnect, mirrormaker Reporter: Yash Mayya The {{MirrorSourceConnector}} class should implement the new alterOffsets API added in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]. We could also implement the API in {{MirrorCheckpointConnector}} and {{MirrorHeartbeatConnector}} to prevent external modification of offsets since the operation wouldn't really make sense in their case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements
[ https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14353: --- Priority: Minor (was: Major) > Kafka Connect REST API configuration validation timeout improvements > > > Key: KAFKA-14353 > URL: https://issues.apache.org/jira/browse/KAFKA-14353 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > Labels: kip-required > > Kafka Connect currently defines a default REST API request timeout of [90 > seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]. > If a REST API request takes longer than this timeout value, a {{500 Internal > Server Error}} response is returned with the message "Request timed out". > The {{POST /connectors}} and the {{PUT /connectors/\{connector}/config}} > endpoints that are used to create or update connectors internally do a > connector configuration validation (the details of which vary depending on > the connector plugin) before proceeding to write a message to the Connect > cluster's config topic. If the configuration validation takes longer than 90 > seconds, the connector is still eventually created after the config > validation completes (even though a {{500 Internal Server Error}} response > is returned to the user) which leads to a fairly confusing user experience. > Furthermore, this situation is exacerbated by the potential for config > validations occurring twice for a single request. If Kafka Connect is running > in distributed mode, requests to create or update a connector are forwarded > to the Connect worker which is currently the leader of the group, if the > initial request is made to a worker which is not the leader. In this case, > the config validation occurs both on the initial worker, as well as the > leader (assuming that the first config validation is successful) - this means > that if a config validation takes longer than 45 seconds to complete each > time, it will result in the original create / update connector request timing > out. > Slow config validations can occur in certain exceptional scenarios - consider > a database connector which has elaborate validation logic involving querying > information schema to get a list of tables and views to validate the user's > connector configuration. If the database has a very high number of tables and > views and the database is under a heavy load in terms of query volume, such > information schema queries can end up being considerably slow to complete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements
[ https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14353: --- Description: Kafka Connect currently defines a default REST API request timeout of [90 seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]. If a REST API request takes longer than this timeout value, a {{500 Internal Server Error}} response is returned with the message "Request timed out". The {{POST /connectors}} and the {{PUT /connectors/\{connector}/config}} endpoints that are used to create or update connectors internally do a connector configuration validation (the details of which vary depending on the connector plugin) before proceeding to write a message to the Connect cluster's config topic. If the configuration validation takes longer than 90 seconds, the connector is still eventually created after the config validation completes (even though a {{500 Internal Server Error}} response is returned to the user) which leads to a fairly confusing user experience. Furthermore, this situation is exacerbated by the potential for config validations occurring twice for a single request. If Kafka Connect is running in distributed mode, requests to create or update a connector are forwarded to the Connect worker which is currently the leader of the group, if the initial request is made to a worker which is not the leader. In this case, the config validation occurs both on the initial worker, as well as the leader (assuming that the first config validation is successful) - this means that if a config validation takes longer than 45 seconds to complete each time, it will result in the original create / update connector request timing out. Slow config validations can occur in certain exceptional scenarios - consider a database connector which has elaborate validation logic involving querying information schema to get a list of tables and views to validate the user's connector configuration. If the database has a very high number of tables and views and the database is under a heavy load in terms of query volume, such information schema queries can end up being considerably slow to complete. was:Kafka Connect currently defines a default REST API request timeout of [90 seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30] which isn't configurable. If a REST API request takes longer than this, a {{500 Internal Server Error}} response is returned with the message "Request timed out". In exceptional scenarios, a longer timeout may be required for operations such as connector config validation / connector creation (which internally does a config validation first). We should allow the request timeout to be configurable via a Kafka Connect worker property. > Kafka Connect REST API configuration validation timeout improvements > > > Key: KAFKA-14353 > URL: https://issues.apache.org/jira/browse/KAFKA-14353 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Labels: kip-required > > Kafka Connect currently defines a default REST API request timeout of [90 > seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]. > If a REST API request takes longer than this timeout value, a {{500 Internal > Server Error}} response is returned with the message "Request timed out". > The {{POST /connectors}} and the {{PUT /connectors/\{connector}/config}} > endpoints that are used to create or update connectors internally do a > connector configuration validation (the details of which vary depending on > the connector plugin) before proceeding to write a message to the Connect > cluster's config topic. If the configuration validation takes longer than 90 > seconds, the connector is still eventually created after the config > validation completes (even though a {{500 Internal Server Error}} response > is returned to the user) which leads to a fairly confusing user experience. > Furthermore, this situation is exacerbated by the potential for config > validations occurring twice for a single request. If Kafka Connect is running > in distributed mode, requests to create or update a connector are forwarded > to the Connect worker which is currently the leader of the group, if the > initial request is made to a worker which is not the
[jira] [Updated] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements
[ https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14353: --- Summary: Kafka Connect REST API configuration validation timeout improvements (was: Make Kafka Connect REST API request timeouts configurable) > Kafka Connect REST API configuration validation timeout improvements > > > Key: KAFKA-14353 > URL: https://issues.apache.org/jira/browse/KAFKA-14353 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Labels: kip-required > > Kafka Connect currently defines a default REST API request timeout of [90 > seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30] > which isn't configurable. If a REST API request takes longer than this, a > {{500 Internal Server Error}} response is returned with the message "Request > timed out". In exceptional scenarios, a longer timeout may be required for > operations such as connector config validation / connector creation (which > internally does a config validation first). We should allow the request > timeout to be configurable via a Kafka Connect worker property. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15151) Missing connector-stopped-task-count metric
[ https://issues.apache.org/jira/browse/KAFKA-15151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740554#comment-17740554 ] Yash Mayya commented on KAFKA-15151: When a connector is stopped, all the tasks are shutdown - i.e. the task count would become 0. So I don't think that such a metric would make sense here? > Missing connector-stopped-task-count metric > --- > > Key: KAFKA-15151 > URL: https://issues.apache.org/jira/browse/KAFKA-15151 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Yash Mayya >Priority: Major > > We have task-count metrics for all other states but when adding the STOPPED > state we did not add the respective metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15151) Missing connector-stopped-task-count metric
[ https://issues.apache.org/jira/browse/KAFKA-15151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-15151: -- Assignee: Yash Mayya > Missing connector-stopped-task-count metric > --- > > Key: KAFKA-15151 > URL: https://issues.apache.org/jira/browse/KAFKA-15151 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Yash Mayya >Priority: Major > > We have task-count metrics for all other states but when adding the STOPPED > state we did not add the respective metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
[ https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15145: --- Description: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2 (was: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2) > AbstractWorkerSourceTask re-processes records filtered out by SMTs on > retriable exceptions > -- > > Key: KAFKA-15145 > URL: https://issues.apache.org/jira/browse/KAFKA-15145 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2, 3.5.0, 3.4.1 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > If a RetriableException is thrown from an admin client or producer client > operation in > [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], > the send operation is retried for the remaining records in the batch. There > is a bug in the logic for computing the remaining records in a batch which > causes records that are filtered out by the task's transformation chain to be > re-processed. This will also result in the SourceTask::commitRecord method > being called twice for the same record, which can cause certain types of > source connectors to fail. This bug seems to exist since when SMTs were first > introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
[ https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15145: --- Description: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2 (was: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist ever since when SMTs were first introduced in 0.10.2) > AbstractWorkerSourceTask re-processes records filtered out by SMTs on > retriable exceptions > -- > > Key: KAFKA-15145 > URL: https://issues.apache.org/jira/browse/KAFKA-15145 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2, 3.5.0, 3.4.1 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > If a RetriableException is thrown from an admin client or producer client > operation in > [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], > the send operation is retried for the remaining records in the batch. There > is a minor bug in the logic for computing the remaining records in a batch > which causes records that are filtered out by the task's transformation chain > to be re-processed. This will also result in the SourceTask::commitRecord > method being called twice for the same record, which can cause certain types > of source connectors to fail. This bug seems to exist since when SMTs were > first introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
[ https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15145: --- Description: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2 (was: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records for a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2) > AbstractWorkerSourceTask re-processes records filtered out by SMTs on > retriable exceptions > -- > > Key: KAFKA-15145 > URL: https://issues.apache.org/jira/browse/KAFKA-15145 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2, 3.5.0, 3.4.1 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > If a RetriableException is thrown from an admin client or producer client > operation in > [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], > the send operation is retried for the remaining records in the batch. There > is a minor bug in the logic for computing the remaining records in a batch > which causes records that are filtered out by the task's transformation chain > to be re-processed. This will also result in the SourceTask::commitRecord > method being called twice for the same record, which can cause certain types > of source connectors to fail. This bug seems to exist since when SMTs were > first introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
[ https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15145: --- Description: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist ever since when SMTs were first introduced in 0.10.2 (was: If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records in a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2) > AbstractWorkerSourceTask re-processes records filtered out by SMTs on > retriable exceptions > -- > > Key: KAFKA-15145 > URL: https://issues.apache.org/jira/browse/KAFKA-15145 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2, 3.5.0, 3.4.1 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > If a RetriableException is thrown from an admin client or producer client > operation in > [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], > the send operation is retried for the remaining records in the batch. There > is a minor bug in the logic for computing the remaining records in a batch > which causes records that are filtered out by the task's transformation chain > to be re-processed. This will also result in the SourceTask::commitRecord > method being called twice for the same record, which can cause certain types > of source connectors to fail. This bug seems to exist ever since when SMTs > were first introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
[ https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15145: --- Affects Version/s: 3.4.1 3.5.0 3.3.2 3.3.1 3.2.3 3.2.2 3.4.0 3.2.1 3.1.2 3.0.2 3.3.0 3.1.1 3.2.0 2.8.2 3.0.1 3.0.0 2.8.1 2.7.2 2.6.3 3.1.0 2.6.2 2.7.1 2.8.0 2.6.1 2.7.0 2.5.1 2.6.0 2.4.1 2.5.0 2.3.1 2.4.0 2.2.2 2.2.1 2.3.0 2.1.1 2.2.0 2.1.0 2.0.1 2.0.0 1.1.1 1.1.0 1.0.2 1.0.1 1.0.0 0.11.0.3 0.11.0.2 0.11.0.1 0.11.0.0 0.10.2.2 0.10.2.1 0.10.2.0 > AbstractWorkerSourceTask re-processes records filtered out by SMTs on > retriable exceptions > -- > > Key: KAFKA-15145 > URL: https://issues.apache.org/jira/browse/KAFKA-15145 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2, 3.5.0, 3.4.1 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > If a RetriableException is thrown from an admin client or producer client > operation in > [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], > the send operation is retried for the remaining records in the batch. There > is a minor bug in the logic for computing the remaining records for a batch > which causes records that are filtered out by the task's transformation chain > to be re-processed. This will also result in the SourceTask::commitRecord > method being called twice for the same record, which can cause certain types > of source connectors to fail. This bug seems to exist since when SMTs were > first introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
Yash Mayya created KAFKA-15145: -- Summary: AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions Key: KAFKA-15145 URL: https://issues.apache.org/jira/browse/KAFKA-15145 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya If a RetriableException is thrown from an admin client or producer client operation in [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], the send operation is retried for the remaining records in the batch. There is a minor bug in the logic for computing the remaining records for a batch which causes records that are filtered out by the task's transformation chain to be re-processed. This will also result in the SourceTask::commitRecord method being called twice for the same record, which can cause certain types of source connectors to fail. This bug seems to exist since when SMTs were first introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-15091: -- Assignee: Yash Mayya > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15121) FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs (alterOffsets)
[ https://issues.apache.org/jira/browse/KAFKA-15121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-15121: --- Summary: FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs (alterOffsets) (was: FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs) > FileStreamSourceConnector and FileStreamSinkConnector should implement > KIP-875 APIs (alterOffsets) > -- > > Key: KAFKA-15121 > URL: https://issues.apache.org/jira/browse/KAFKA-15121 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > introduced the new SourceConnector::alterOffsets and > SinkConnector::alterOffsets APIs. The FileStreamSourceConnector and > FileStreamSinkConnector should implement these new methods to improve the > user experience when modifying offsets for these connectors and also to serve > as an example for other connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15121) FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs
Yash Mayya created KAFKA-15121: -- Summary: FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs Key: KAFKA-15121 URL: https://issues.apache.org/jira/browse/KAFKA-15121 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] introduced the new SourceConnector::alterOffsets and SinkConnector::alterOffsets APIs. The FileStreamSourceConnector and FileStreamSinkConnector should implement these new methods to improve the user experience when modifying offsets for these connectors and also to serve as an example for other connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters
[ https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736483#comment-17736483 ] Yash Mayya commented on KAFKA-15113: Thanks for reviewing the ticket Chris! Yeah, I'm not sure whether it's a realistic use case either, but I guess it's possible today to setup a sink connector which consumes from a topic on one Kafka cluster but has a DLQ topic on another cluster for example? So any change we'd make in this area would require a KIP I presume. I do like the idea of making it easier to configure common Kafka client override configurations, although I'm not so sure about changing the request structure for the create / update connector REST APIs just for this use case? It'd also be a little tricky to do the same with the `PUT /connectors/\{connector}/config` endpoint while maintaining compatibility. > Gracefully handle cases where a sink connector's admin and consumer client > config overrides target different Kafka clusters > --- > > Key: KAFKA-15113 > URL: https://issues.apache.org/jira/browse/KAFKA-15113 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Yash Mayya >Priority: Minor > > Background reading - > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] > > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > > > From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] - > {quote}Currently, admin clients are only instantiated for sink connectors to > create the DLQ topic if required. So it seems like it could be technically > possible for a sink connector's consumer client overrides to target a > different Kafka cluster from its producer and admin client overrides. Such a > setup won't work with this implementation of the get offsets API as it is > using an admin client to get a sink connector's consumer group offsets. > However, I'm not sure we want to use a consumer client to retrieve the > offsets either as we shouldn't be disrupting the existing sink tasks' > consumer group just to fetch offsets. Leveraging a sink task's consumer also > isn't an option because fetching offsets for a stopped sink connector (where > all the tasks will be stopped) should be allowed. I'm wondering if we should > document that a connector's various client config override policies shouldn't > target different Kafka clusters (side note - looks like we don't [currently > document|https://kafka.apache.org/documentation/#connect] client config > overrides for Connect beyond just the worker property > {{{}connector.client.config.override.policy{}}}). > {quote} > > {quote}I don't think we need to worry too much about this. I cannot imagine a > sane use case that involves overriding a connector's Kafka clients with > different Kafka clusters (not just bootstrap servers, but actually different > clusters) for producer/consumer/admin. I'd be fine with adding a note to our > docs that that kind of setup isn't supported but I really, really hope that > it's not necessary and nobody's trying to do that in the first place. I also > suspect that there are other places where this might cause issues, like with > exactly-once source support or automatic topic creation for source connectors. > That said, there is a different case we may want to consider: someone may > have configured consumer overrides for a sink connector, but not admin > overrides. This may happen if they don't use a DLQ topic. I don't know if we > absolutely need to handle this now and we may consider filing a follow-up > ticket to look into this, but one quick-and-dirty thought I've had is to > configure the admin client used here with a combination of the configurations > for the connector's admin client and its consumer, giving precedent to the > latter. > {quote} > > Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] > - > {quote}We will have undesirable behavior if the connector is targeting a > Kafka cluster different from the Connect cluster's backing Kafka cluster and > the user has configured the consumer overrides appropriately for their > connector, but not the admin overrides (something we also discussed > previously > [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]). > In the above case, if a user attempts to reset their sink connector's offsets > via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following > will occur: > # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} > which returns an empty partition offsets map
[jira] [Created] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters
Yash Mayya created KAFKA-15113: -- Summary: Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters Key: KAFKA-15113 URL: https://issues.apache.org/jira/browse/KAFKA-15113 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Yash Mayya Background reading - * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] >From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] - {quote}Currently, admin clients are only instantiated for sink connectors to create the DLQ topic if required. So it seems like it could be technically possible for a sink connector's consumer client overrides to target a different Kafka cluster from its producer and admin client overrides. Such a setup won't work with this implementation of the get offsets API as it is using an admin client to get a sink connector's consumer group offsets. However, I'm not sure we want to use a consumer client to retrieve the offsets either as we shouldn't be disrupting the existing sink tasks' consumer group just to fetch offsets. Leveraging a sink task's consumer also isn't an option because fetching offsets for a stopped sink connector (where all the tasks will be stopped) should be allowed. I'm wondering if we should document that a connector's various client config override policies shouldn't target different Kafka clusters (side note - looks like we don't [currently document|https://kafka.apache.org/documentation/#connect] client config overrides for Connect beyond just the worker property {{{}connector.client.config.override.policy{}}}). {quote} {quote}I don't think we need to worry too much about this. I cannot imagine a sane use case that involves overriding a connector's Kafka clients with different Kafka clusters (not just bootstrap servers, but actually different clusters) for producer/consumer/admin. I'd be fine with adding a note to our docs that that kind of setup isn't supported but I really, really hope that it's not necessary and nobody's trying to do that in the first place. I also suspect that there are other places where this might cause issues, like with exactly-once source support or automatic topic creation for source connectors. That said, there is a different case we may want to consider: someone may have configured consumer overrides for a sink connector, but not admin overrides. This may happen if they don't use a DLQ topic. I don't know if we absolutely need to handle this now and we may consider filing a follow-up ticket to look into this, but one quick-and-dirty thought I've had is to configure the admin client used here with a combination of the configurations for the connector's admin client and its consumer, giving precedent to the latter. {quote} Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] - {quote}We will have undesirable behavior if the connector is targeting a Kafka cluster different from the Connect cluster's backing Kafka cluster and the user has configured the consumer overrides appropriately for their connector, but not the admin overrides (something we also discussed previously [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]). In the above case, if a user attempts to reset their sink connector's offsets via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following will occur: # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} which returns an empty partition offsets map for the sink connector's consumer group ID (it exists on a different Kafka cluster to the one that the admin client is connecting to). # We call {{SinkConnector::alterOffsets}} with an empty offsets map which could cause the sink connector to propagate the offsets reset related changes to the sink system. # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} which returns {{GroupIdNotFoundException}} which we essentially swallow in order to keep offsets reset operations idempotent and return a success message to the user (even though the real consumer group for the sink connector on the other Kafka cluster hasn't been deleted). This will occur if the connector's admin overrides are missing OR the admin overrides are deliberately configured to target a Kafka cluster different from the consumer overrides (although like you pointed out in the other linked thread, this doesn't seem like a valid use case that we'd even want to support). I guess we'd want to pursue the approach you suggested where we'd configure the admin client with a combination of the connector's admin overrides and
[jira] [Commented] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732666#comment-17732666 ] Yash Mayya commented on KAFKA-15091: {quote}{{it does not have anything to do with the offsets returned from {{SourceTask:poll}} and is instead just a general, periodically-invoked hook to let the task know that an offset commit has taken place (but with no guarantees as to which offsets have been committed and which ones correspond to still-in-flight records).}} {quote} The SourceTask::commit method doesn't seem like a particularly useful hook in its current shape; I wonder whether we should consider deprecating it...? > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field
[ https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-15012: -- Assignee: Yash Mayya > JsonConverter fails when there are leading Zeros in a field > --- > > Key: KAFKA-15012 > URL: https://issues.apache.org/jira/browse/KAFKA-15012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0, 3.3.2 >Reporter: Ranjan Rao >Assignee: Yash Mayya >Priority: Major > Attachments: > enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch > > > When there are leading zeros in a field in the Kakfa Record, a sink connector > using JsonConverter fails with the below exception > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] > to Kafka Connect data failed due to serialization error: > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading > zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric > value: Leading zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > at > com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) > at > com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754) > at > com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247) > at > com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734) > at > org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > o
[jira] [Comment Edited] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field
[ https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725223#comment-17725223 ] Yash Mayya edited comment on KAFKA-15012 at 5/30/23 6:34 PM: - Thanks for filing this Jira [~ranjanrao]. Simply enabling the _ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible change since there could potentially be users relying on the existing behavior to send bad data (i.e. some numeric field has leading zeroes when it isn't expected to) to a DLQ topic. This might need a small KIP adding a new config (maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which defaults to false in order to be backward compatible. Alternatively, we could design a way to allow users to configure the various [JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s and [JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonWriteFeature.html]s for the JsonSerializer / JsonDeserializer used in the JsonConverter. was (Author: yash.mayya): Thanks for filing this Jira [~ranjanrao]. Simply enabling the _ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible change since there could potentially be users relying on the existing behavior to send bad data (i.e. some numeric field has leading zeroes when it isn't expected to) to a DLQ topic. This might need a small KIP adding a new config (maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which defaults to false in order to be backward compatible. Alternatively, we could design a way to allow users to configure the various [JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s and [JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/JsonParser.Feature.html]s for the JsonSerializer / JsonDeserializer used in the JsonConverter. > JsonConverter fails when there are leading Zeros in a field > --- > > Key: KAFKA-15012 > URL: https://issues.apache.org/jira/browse/KAFKA-15012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0, 3.3.2 >Reporter: Ranjan Rao >Priority: Major > Attachments: > enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch > > > When there are leading zeros in a field in the Kakfa Record, a sink connector > using JsonConverter fails with the below exception > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] > to Kafka Connect data failed due to serialization error: > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) > at > org.apache.kafka
[jira] [Commented] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field
[ https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17727653#comment-17727653 ] Yash Mayya commented on KAFKA-15012: Thanks [~ChrisEgerton], that's a fair point. I was comparing this to https://issues.apache.org/jira/browse/KAFKA-8713 but your point regarding there being nothing different about the data given to the sink connectors post conversion here makes sense and causes this case to be different from [KIP-581|https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value]. I agree that this qualifies as a bug in the converter so I think we should go ahead with the proposed fix here. > JsonConverter fails when there are leading Zeros in a field > --- > > Key: KAFKA-15012 > URL: https://issues.apache.org/jira/browse/KAFKA-15012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0, 3.3.2 >Reporter: Ranjan Rao >Priority: Major > Attachments: > enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch > > > When there are leading zeros in a field in the Kakfa Record, a sink connector > using JsonConverter fails with the below exception > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] > to Kafka Connect data failed due to serialization error: > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading > zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric > value: Leading zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > at > com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) > at > com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754) > at > com.fasterxml.jackson.databind.ObjectMapper._rea
[jira] [Commented] (KAFKA-15034) Improvement of ReplaceField performance for long list
[ https://issues.apache.org/jira/browse/KAFKA-15034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17727119#comment-17727119 ] Yash Mayya commented on KAFKA-15034: Thanks for filing this ticket [~baz33]. I think what you're suggesting makes sense and I've raised this PR which uses a HashSet for the include / exclude fields in the ReplaceField SMT and adds a JMH benchmark to demonstrate the performance improvements - [https://github.com/apache/kafka/pull/13776] > Improvement of ReplaceField performance for long list > - > > Key: KAFKA-15034 > URL: https://issues.apache.org/jira/browse/KAFKA-15034 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.4.0 >Reporter: BDeus >Assignee: Yash Mayya >Priority: Minor > > SMTs ReplaceField use List for include and exclude filter that use > ArrayList internally. > In case of long list of filter the complexity of arraylist _O(n )_ results in > poor performance. > Could we use HashSet implementation in ReplaceField class instead of the > traditionnal ArrayList ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15034) Improvement of ReplaceField performance for long list
[ https://issues.apache.org/jira/browse/KAFKA-15034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-15034: -- Assignee: Yash Mayya > Improvement of ReplaceField performance for long list > - > > Key: KAFKA-15034 > URL: https://issues.apache.org/jira/browse/KAFKA-15034 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.4.0 >Reporter: BDeus >Assignee: Yash Mayya >Priority: Minor > > SMTs ReplaceField use List for include and exclude filter that use > ArrayList internally. > In case of long list of filter the complexity of arraylist _O(n )_ results in > poor performance. > Could we use HashSet implementation in ReplaceField class instead of the > traditionnal ArrayList ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17727025#comment-17727025 ] Yash Mayya commented on KAFKA-14956: This test seems to be consistently passing after [https://github.com/apache/kafka/pull/13465] was merged (timeout value for the read offsets operation in *OffsetsApiIntegrationTest* was bumped up) so I'm gonna mark this ticket as resolved. In case a failure re-occurs, this ticket can be re-opened for investigation. > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > Labels: flaky-test > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnit
[jira] [Resolved] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya resolved KAFKA-14956. Resolution: Fixed > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > Labels: flaky-test > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at > java.base@17.0.7/jdk.int
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725722#comment-17725722 ] Yash Mayya commented on KAFKA-14995: [~dajac] that GitHub team isn't publicly accessible though. {quote}To generate the YAML lists, we need to map from Git log "Author" to Github username. There's presumably some way to do this in the Github REST API (the mapping is based on the email, IIUC), or we could also just update the Committers page to also document each committer's Github username.{quote} This won't work because the list of collaborators is supposed to be dynamic right? > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14952) Publish metrics when source connector fails to poll data
[ https://issues.apache.org/jira/browse/KAFKA-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14952: --- Labels: connect connect-api kip-required (was: connect connect-api) > Publish metrics when source connector fails to poll data > > > Key: KAFKA-14952 > URL: https://issues.apache.org/jira/browse/KAFKA-14952 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.3.2 >Reporter: Ravindranath Kakarla >Priority: Minor > Labels: connect, connect-api, kip-required > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, there is no metric in Kafka Connect to track when a source > connector fails to poll data from the source. This information would be > useful to operators and developers to visualize, monitor and alert when the > connector fails to poll records from the source. > Existing metrics like *kafka_producer_producer_metrics_record_error_total* > and *kafka_connect_task_error_metrics_total_record_failures* only cover > failures when producing data to the Kafka cluster but not when the source > task fails with a retryable exception or ConnectException. > Polling from source can fail due to unavailability of the source system or > errors with the connect configuration. Currently, this cannot be monitored > directly using metrics and instead operators have to rely on log diving which > is not consistent with how other metrics are monitored. > I propose adding new metrics to Kafka Connect, > "{_}source-record-poll-error-total{_}" and > "{_}source-record-poll-error-rate{_}" that can be used to monitor failures > during polling. > *source-record-poll-error-total* - The total number of times a source > connector failed to poll data from the source. This will include both > retryable and non-retryable exceptions. > *source-record-poll-error-rate* - The rate of above failures per unit of time. > These metrics would be tracked at the connector level and could be exposed > through the JMX along with the other metrics. > I am willing to submit a PR if this looks good, sample implementation code > below, > {code:java} > //AbstractWorkerSourceTask.java > protected List poll() throws InterruptedException { > try { > return task.poll(); > } catch (RetriableException | > org.apache.kafka.common.errors.RetriableException e) { > log.warn("{} failed to poll records from SourceTask. Will retry > operation.", this, e); > > sourceTaskMetricsGroup.recordPollError(); > // Do nothing. Let the framework poll whenever it's ready. > return null; > } catch (Throwable e) { > sourceTaskMetricsGroup.recordPollError(); > > throw e; > } > } {code} > [Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors
[ https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725713#comment-17725713 ] Yash Mayya commented on KAFKA-15018: Thanks for filing this bug ticket [~ChrisEgerton], it's an interesting one. This bug also affects regular source connectors that are configured to use a separate offsets topic right (and not only exactly-once support enabled source connectors as indicated in the ticket title / description)? I like the idea of synchronously writing tombstones to the global offsets store before writing them to the connector specific offsets store - it's clean and simple. You have a valid point about two synchronous writes to topics on potentially separate Kafka clusters being sub-optimal, however I also agree with your later point on tombstone offsets being pretty uncommon based on the connectors and use-cases I've encountered so far. I think it's a very reasonable trade-off to make (i.e. slightly worse performance for an edge case over potential correctness issues for the same edge case). An alternate idea I had was to treat tombstones values differently from the absence of an offset - i.e. use a special value to represent tombstones in the offset store. This would allow us to distinguish between no offset being present in the connector specific offset store for a particular partition versus it being explicitly wiped by a connector task via a null / tombstone offset. The "special" value wouldn't be persisted in the topic, it would only be present in the in-memory store which represents the materialized view of the offset topic. However, we'd need to do some additional work to ensure that this value isn't leaked to connectors / tasks - basically, it should only be surfaced to the ConnectorOffsetBackingStore in order to make a decision on whether or not to use the offset from the global offsets store. I personally think that the other approach is cleaner overall though, WDYT? > Potential tombstone offsets corruption for exactly-once source connectors > - > > Key: KAFKA-15018 > URL: https://issues.apache.org/jira/browse/KAFKA-15018 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1 >Reporter: Chris Egerton >Priority: Major > > When exactly-once support is enabled for source connectors, source offsets > can potentially be written to two different offsets topics: a topic specific > to the connector, and the global offsets topic (which was used for all > connectors prior to KIP-618 / version 3.3.0). > Precedence is given to offsets in the per-connector offsets topic, but if > none are found for a given partition, then the global offsets topic is used > as a fallback. > When committing offsets, a transaction is used to ensure that source records > and source offsets are written to the Kafka cluster targeted by the source > connector. This transaction only includes the connector-specific offsets > topic. Writes to the global offsets topic take place after writes to the > connector-specific offsets topic have completed successfully, and if they > fail, a warning message is logged, but no other action is taken. > Normally, this ensures that, for offsets committed by exactly-once-supported > source connectors, the per-connector offsets topic is at least as up-to-date > as the global offsets topic, and sometimes even ahead. > However, for tombstone offsets, we lose that guarantee. If a tombstone offset > is successfully written to the per-connector offsets topic, but cannot be > written to the global offsets topic, then the global offsets topic will still > contain that source offset, but the per-connector topic will not. Due to the > fallback-on-global logic used by the worker, if a task requests offsets for > one of the tombstoned partitions, the worker will provide it with the offsets > present in the global offsets topic, instead of indicating to the task that > no offsets can be found. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field
[ https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725223#comment-17725223 ] Yash Mayya commented on KAFKA-15012: Thanks for filing this Jira [~ranjanrao]. Simply enabling the _ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible change since there could potentially be users relying on the existing behavior to send bad data (i.e. some numeric field has leading zeroes when it isn't expected to) to a DLQ topic. This might need a small KIP adding a new config (maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which defaults to false in order to be backward compatible. Alternatively, we could design a way to allow users to configure the various [JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s and [JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/JsonParser.Feature.html]s for the JsonSerializer / JsonDeserializer used in the JsonConverter. > JsonConverter fails when there are leading Zeros in a field > --- > > Key: KAFKA-15012 > URL: https://issues.apache.org/jira/browse/KAFKA-15012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0, 3.3.2 >Reporter: Ranjan Rao >Priority: Major > Attachments: > enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch > > > When there are leading zeros in a field in the Kakfa Record, a sink connector > using JsonConverter fails with the below exception > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] > to Kafka Connect data failed due to serialization error: > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading > zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric > value: Leading zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > at > com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) > at > com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520) > at > com.fasterxml.j
[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720566#comment-17720566 ] Yash Mayya commented on KAFKA-14974: Thanks [~rhauch], your understanding here is correct. We should backport [this fix|https://github.com/apache/kafka/pull/13688] to {{{}3.3{}}}, {{3.4}} and {{3.5}} as well (before the {{3.5.0}} release ideally, if possible). > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic
[ https://issues.apache.org/jira/browse/KAFKA-14455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14455: --- Fix Version/s: 3.5.0 3.4.1 3.3.3 > Kafka Connect create and update REST APIs should surface failures while > writing to the config topic > --- > > Key: KAFKA-14455 > URL: https://issues.apache.org/jira/browse/KAFKA-14455 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` > REST APIs internally simply write a message to the Connect cluster's internal > config topic (which is then processed asynchronously by the herder). However, > no callback is passed to the producer's send method and there is no error > handling in place for producer send failures (see > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716] > / > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]). > Consider one such case where the Connect worker's principal doesn't have a > WRITE ACL on the cluster's config topic. Now suppose the user submits a > connector's configs via one of the above two APIs. The producer send > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716] > / > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726] > won't succeed (due to a TopicAuthorizationException) but the API responses > will be `201 Created` success responses anyway. This is a very poor UX > because the connector will actually never be created but the API response > indicated success. Furthermore, this failure would only be detectable if > TRACE logs are enabled (via [this > log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java] > making it near impossible for users to debug. Producer callbacks should be > used to surface write failures back to the user via the API response. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14974: --- Affects Version/s: 3.5.0 3.4.1 3.3.3 > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
Yash Mayya created KAFKA-14974: -- Summary: Restore backward compatibility in KafkaBasedLog Key: KAFKA-14974 URL: https://issues.apache.org/jira/browse/KAFKA-14974 Project: Kafka Issue Type: Task Reporter: Yash Mayya Assignee: Yash Mayya {{KafkaBasedLog}} is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply by re-instating the older send methods, and renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14930: --- Description: Add public documentation for the new Kafka Connect offset management REST APIs from [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] being introduced in 3.6: * *PATCH* /connectors/\{connector}/offsets * *DELETE* /connectors/\{connector}/offsets was: Add public documentation for the 3 new Kafka Connect offset management REST APIs being introduced in [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] * *PATCH* /connectors/\{connector}/offsets * *DELETE* /connectors/\{connector}/offsets > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14930 > URL: https://issues.apache.org/jira/browse/KAFKA-14930 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > > Add public documentation for the new Kafka Connect offset management REST > APIs from > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > being introduced in 3.6: > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14956: -- Assignee: Yash Mayya > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Metho
[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.
[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718513#comment-17718513 ] Yash Mayya commented on KAFKA-14947: [~krishnendudas] the offset commits for source connectors were asynchronous and periodic even in AK 2.6.2, see [this class|https://github.com/apache/kafka/blob/da65af02e5856e3429259e26eb49986122e34747/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L47]. {quote}With our existing connect API code and with the Kafka server (2.6.2), our ingestion mechanism was working fine in the live environment. We checked the Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was following the below-mentioned execution path # Poll the task for the new data. # If get any data, save the new data into the Kafka topic. # Commit the offset.{quote} This isn't exactly accurate as the offset commit part isn't done synchronously with the poll and the produce (to the Kafka topic). Perhaps the *offset.flush.interval.ms* worker configuration (which determines the offset commit interval) is different between your two environments? {quote}But that willn't be persistent. At every start, the object will be reset. Any suggestion, on how we can make it persistent in the new Kafka server (3.1.1) {quote} That will always be the case for restarts which is why I mentioned in my previous comment that source offsets are typically queried from the offset storage only during connector / task startup. I'm not sure I follow why the local in-memory offset can't be used during regular running of the connector task? > Duplicate records are getting created in the topic. > > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.1.1 >Reporter: krishnendu Das >Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the start of the flow, the call goes to > updateCommittableOffsets() to check if anything was there to perform the > committed offset or not. As the first poll is still not yet happened, this > function didn't find anything for commit. > # Then Kafka connects API poll() method is called from the > WorkerSourceTask::execute(). *-> 1st poll* > # Kafka Connect API (using sleepy policy) reads one source file from the > Cloud source directory. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic. > # During the 2nd poll updateCommittableOffsets() found some offset to commit > and its updates a reference variable committableOffsets, which will be used > further by the WorkerSourceTask::commitOffsets() function to perform actual > commit offset. > # Then Kafka connects the API poll() method is called from the > *WorkerSourceTask::execute().* *-> 2nd poll* > # Kafka Connect API (using sleepy policy) reads the same source file again > from the start, as the offsetStrorageReader::offset() didn’t give the latest > offset. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic.---> These create duplicate data into the topic. > > > # WorkerSourceTask::commitOffsets() commits the offset. > > > # Then Kafka connects API poll() method is called from the > {*}WorkerSourceTask::execute(){*}. -> 3rd poll > # This time offsetStrorageReader::offset() will be able to give the latest > offset. > # Kafka Connect API (using sleepy policy) reads the same source file from > the last read position. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5
[ https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14876: --- Description: Add public documentation for the new Kafka Connect REST APIs from [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] being introduced in 3.5: * *GET* /connectors/\{connector}/offsets * *PUT* /connectors/\{connector}/stop was: Add public documentation for the new Kafka Connect offset management REST API being introduced in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] in 3.5: * *GET* /connectors/\{connector}/offsets > Public documentation for new Kafka Connect offset management REST APIs in 3.5 > - > > Key: KAFKA-14876 > URL: https://issues.apache.org/jira/browse/KAFKA-14876 > Project: Kafka > Issue Type: Sub-task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0 > > > Add public documentation for the new Kafka Connect REST APIs from > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > being introduced in 3.5: > * *GET* /connectors/\{connector}/offsets > * *PUT* /connectors/\{connector}/stop -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5
[ https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718464#comment-17718464 ] Yash Mayya commented on KAFKA-14876: [~gharris1727] [~ChrisEgerton] sounds good, I've raised this PR to document the stop API - [https://github.com/apache/kafka/pull/13657] > Public documentation for new Kafka Connect offset management REST APIs in 3.5 > - > > Key: KAFKA-14876 > URL: https://issues.apache.org/jira/browse/KAFKA-14876 > Project: Kafka > Issue Type: Sub-task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0 > > > Add public documentation for the new Kafka Connect offset management REST API > being introduced in > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > in 3.5: > * *GET* /connectors/\{connector}/offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
[ https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-13187: -- Assignee: Yash Mayya (was: Matthew de Detrich) > Replace EasyMock and PowerMock with Mockito for DistributedHerderTest > - > > Key: KAFKA-13187 > URL: https://issues.apache.org/jira/browse/KAFKA-13187 > Project: Kafka > Issue Type: Sub-task >Reporter: YI-CHEN WANG >Assignee: Yash Mayya >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14947) Duplicate records are getting created in the topic.
[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717095#comment-17717095 ] Yash Mayya edited comment on KAFKA-14947 at 4/27/23 9:58 AM: - [~krishnendudas] this seems like a bug in the connector's implementation rather than a bug in the Connect framework in Apache Kafka. Since source connector offsets are committed by Connect workers periodically and asynchronously, there is no guarantee provided that offsets will be committed between successive poll calls. `OffsetStorageReader::offset` is typically used only during startup of connectors / tasks to resume progress after restarts, pause / resume etc. In your provided scenario, why can't the connector simply read from its previous position in the second poll since it should be maintaining some internal state? Also note that Kafka Connect doesn't support exactly-once semantics for source connectors in 3.1.1, this functionality was added in 3.3.0. Depending on your specific connector, it might also need additional changes to support the changes made in [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors] (this is publicly documented [here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]). Another more lightweight option might be for the connector to implement the [SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--] method and not advance its internal position / offset state until previous offsets are committed - however, it's important to note that this won't guarantee exactly-once semantics (if, for example, a Connect worker goes down after records have been produced to the Kafka topic but before the commit method is called for the task) and is also not ideal for high throughput scenarios since poll could be called multiple times between each offset commit (period is configurable via the worker property {*}offset.flush.interval.ms{*}). was (Author: yash.mayya): [~krishnendudas] this seems like a bug in the connector's implementation rather than a bug in the Connect framework in Apache Kafka. Since source connector offsets are committed by Connect workers periodically and asynchronously, there is no guarantee provided that offsets will be committed between successive poll calls. `OffsetStorageReader::offset` is typically used only during startup of connectors / tasks to resume progress after restarts, pause / resume etc. In your provided scenario, why can't the connector simply read from its previous position in the second poll since it should be maintaining some internal state? Also note that Kafka Connect doesn't support exactly-once semantics for source connectors in 3.1.1, this functionality was added in 3.3.0. Depending on your specific connector, it might also need additional changes to support the changes made in [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors] (this is publicly documented [here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]). Another more lightweight option might be for the connector to implement the [SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--] method and not advance its internal position / offset state until previous offsets are committed - however, it's important to note that this won't guarantee exactly-once semantics. > Duplicate records are getting created in the topic. > > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.1.1 >Reporter: krishnendu Das >Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the sta
[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.
[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717095#comment-17717095 ] Yash Mayya commented on KAFKA-14947: [~krishnendudas] this seems like a bug in the connector's implementation rather than a bug in the Connect framework in Apache Kafka. Since source connector offsets are committed by Connect workers periodically and asynchronously, there is no guarantee provided that offsets will be committed between successive poll calls. `OffsetStorageReader::offset` is typically used only during startup of connectors / tasks to resume progress after restarts, pause / resume etc. In your provided scenario, why can't the connector simply read from its previous position in the second poll since it should be maintaining some internal state? Also note that Kafka Connect doesn't support exactly-once semantics for source connectors in 3.1.1, this functionality was added in 3.3.0. Depending on your specific connector, it might also need additional changes to support the changes made in [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors] (this is publicly documented [here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]). Another more lightweight option might be for the connector to implement the [SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--] method and not advance its internal position / offset state until previous offsets are committed - however, it's important to note that this won't guarantee exactly-once semantics. > Duplicate records are getting created in the topic. > > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.1.1 >Reporter: krishnendu Das >Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the start of the flow, the call goes to > updateCommittableOffsets() to check if anything was there to perform the > committed offset or not. As the first poll is still not yet happened, this > function didn't find anything for commit. > # Then Kafka connects API poll() method is called from the > WorkerSourceTask::execute(). *-> 1st poll* > # Kafka Connect API (using sleepy policy) reads one source file from the > Cloud source directory. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic. > # During the 2nd poll updateCommittableOffsets() found some offset to commit > and its updates a reference variable committableOffsets, which will be used > further by the WorkerSourceTask::commitOffsets() function to perform actual > commit offset. > # Then Kafka connects the API poll() method is called from the > *WorkerSourceTask::execute().* *-> 2nd poll* > # Kafka Connect API (using sleepy policy) reads the same source file again > from the start, as the offsetStrorageReader::offset() didn’t give the latest > offset. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic.---> These create duplicate data into the topic. > > > # WorkerSourceTask::commitOffsets() commits the offset. > > > # Then Kafka connects API poll() method is called from the > {*}WorkerSourceTask::execute(){*}. -> 3rd poll > # This time offsetStrorageReader::offset() will be able to give the latest > offset. > # Kafka Connect API (using sleepy policy) reads the same source file from > the last read position. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14933) Document Kafka Connect's log level REST APIs added in KIP-495
Yash Mayya created KAFKA-14933: -- Summary: Document Kafka Connect's log level REST APIs added in KIP-495 Key: KAFKA-14933 URL: https://issues.apache.org/jira/browse/KAFKA-14933 Project: Kafka Issue Type: Task Components: documentation, KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya [KIP-495|https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect] added 3 REST APIs to allow dynamically adjusting log levels on Kafka Connect workers. This was added a long time ago (released in AK 2.4.0) but was never publicly documented. These REST APIs should be documented in [https://kafka.apache.org/documentation/#connect_rest]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5
[ https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14876: --- Description: Add public documentation for the new Kafka Connect offset management REST API being introduced in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] in 3.5: * *GET* /connectors/\{connector}/offsets was: Add public documentation for the new Kafka Connect offset management REST APIs being introduced in [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] in 3.5 * *GET* /connectors/\{connector}/offsets > Public documentation for new Kafka Connect offset management REST APIs in 3.5 > - > > Key: KAFKA-14876 > URL: https://issues.apache.org/jira/browse/KAFKA-14876 > Project: Kafka > Issue Type: Sub-task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0 > > > Add public documentation for the new Kafka Connect offset management REST API > being introduced in > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > in 3.5: > * *GET* /connectors/\{connector}/offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14930: --- Description: Add public documentation for the 3 new Kafka Connect offset management REST APIs being introduced in [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] * *PATCH* /connectors/\{connector}/offsets * *DELETE* /connectors/\{connector}/offsets was: Add public documentation for the 3 new Kafka Connect offset management REST APIs being introduced in [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]{*}{*} * *PATCH* /connectors/\{connector}/offsets * *DELETE* /connectors/\{connector}/offsets) > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14930 > URL: https://issues.apache.org/jira/browse/KAFKA-14930 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > > Add public documentation for the 3 new Kafka Connect offset management REST > APIs being introduced in > [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14930: -- Assignee: Yash Mayya > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14930 > URL: https://issues.apache.org/jira/browse/KAFKA-14930 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > > Add public documentation for the 3 new Kafka Connect offset management REST > APIs being introduced in > [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]{*}{*} > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14930: --- Fix Version/s: 3.6.0 > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14930 > URL: https://issues.apache.org/jira/browse/KAFKA-14930 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Mickael Maison >Priority: Major > Fix For: 3.6.0 > > > Add public documentation for the 3 new Kafka Connect offset management REST > APIs being introduced in > [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]{*}{*} > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14923) Upgrade io.netty_netty-codec for CVE-2022-41881
[ https://issues.apache.org/jira/browse/KAFKA-14923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714541#comment-17714541 ] Yash Mayya commented on KAFKA-14923: [~vikashmishra0808] looks like [https://github.com/apache/kafka/pull/13070] / https://issues.apache.org/jira/browse/KAFKA-14564 bumped up the netty version to 4.1.86.Final and the commit is present on the 3.5 branch as well (see [here)|https://github.com/apache/kafka/blob/23c013408f620425e9b6111e13be2e20b226a2e6/gradle/dependencies.gradle#L110] since it was merged to trunk before the 3.5 branch was cut. The upcoming 3.5.0 release shouldn't be impacted by this CVE. > Upgrade io.netty_netty-codec for CVE-2022-41881 > --- > > Key: KAFKA-14923 > URL: https://issues.apache.org/jira/browse/KAFKA-14923 > Project: Kafka > Issue Type: Task >Affects Versions: 3.4.0, 3.3.2 >Reporter: Vikash Mishra >Priority: Critical > > Currently used io.netty_netty-codec version 4.1.78 has high severity CVE: > [NVD - CVE-2022-41881 > (nist.gov)|https://nvd.nist.gov/vuln/detail/CVE-2022-41881] > Fix was patched in version 4.1.86.Final. As we have higher stable version > 4.1.91.Final available we should upgrade to same to fix mentioned CVE. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713158#comment-17713158 ] Yash Mayya edited comment on KAFKA-14876 at 4/17/23 4:11 PM: - Yep, makes sense, I can raise a PR for that soon. was (Author: yash.mayya): Yep, makes sense, I can raise a PR for that tomorrow. > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14876 > URL: https://issues.apache.org/jira/browse/KAFKA-14876 > Project: Kafka > Issue Type: Sub-task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0 > > > Add public documentation for the 3 new Kafka Connect offset management REST > APIs being introduced in > [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > * *GET* /connectors/\{connector}/offsets > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713183#comment-17713183 ] Yash Mayya commented on KAFKA-14876: [~ChrisEgerton] I think we should document the new stop API only with the alter / reset offsets APIs as it might not make much sense without them, what do you think? > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14876 > URL: https://issues.apache.org/jira/browse/KAFKA-14876 > Project: Kafka > Issue Type: Sub-task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0 > > > Add public documentation for the 3 new Kafka Connect offset management REST > APIs being introduced in > [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > * *GET* /connectors/\{connector}/offsets > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713158#comment-17713158 ] Yash Mayya commented on KAFKA-14876: Yep, makes sense, I can raise a PR for that tomorrow. > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14876 > URL: https://issues.apache.org/jira/browse/KAFKA-14876 > Project: Kafka > Issue Type: Sub-task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > Add public documentation for the 3 new Kafka Connect offset management REST > APIs being introduced in > [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > * *GET* /connectors/\{connector}/offsets > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14910) Consider cancelling ongoing alter connector offsets requests when the connector is resumed
Yash Mayya created KAFKA-14910: -- Summary: Consider cancelling ongoing alter connector offsets requests when the connector is resumed Key: KAFKA-14910 URL: https://issues.apache.org/jira/browse/KAFKA-14910 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Yash Mayya See discussion here for more details - [https://github.com/apache/kafka/pull/13465#discussion_r1164465874] The implementation for the _*PATCH /connectors/\{connector}/offsets*_ and _*DELETE /connectors/\{connector}/offsets*_ APIs is completely asynchronous and the check for whether the connector is stopped will only be made at the beginning of the request. If the connector is resumed while the alter / reset offsets request is being processed, this can lead to certain issues (especially with non-EoS source connectors). For sink connectors, admin client requests to alter / reset offsets for a consumer group will be rejected if the consumer group is active (i.e. when the connector tasks come up). For source connectors when exactly once support is enabled on the worker, we do a round of zombie fencing before the tasks are brought up and this will basically disable the transactional producer used to alter offsets (the transactional producer uses the transactional ID for task 0 of the connector). However, for source connectors when exactly once support is not enabled on the worker (this is the default), there are no such safeguards. We could potentially add some interruption logic that cancels ongoing alter / reset offset requests when a connector is resumed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14910) Consider cancelling ongoing alter / reset connector offsets requests when the connector is resumed
[ https://issues.apache.org/jira/browse/KAFKA-14910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14910: --- Summary: Consider cancelling ongoing alter / reset connector offsets requests when the connector is resumed (was: Consider cancelling ongoing alter connector offsets requests when the connector is resumed) > Consider cancelling ongoing alter / reset connector offsets requests when the > connector is resumed > -- > > Key: KAFKA-14910 > URL: https://issues.apache.org/jira/browse/KAFKA-14910 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yash Mayya >Priority: Major > > See discussion here for more details - > [https://github.com/apache/kafka/pull/13465#discussion_r1164465874] > The implementation for the _*PATCH /connectors/\{connector}/offsets*_ and > _*DELETE /connectors/\{connector}/offsets*_ APIs is completely asynchronous > and the check for whether the connector is stopped will only be made at the > beginning of the request. > If the connector is resumed while the alter / reset offsets request is being > processed, this can lead to certain issues (especially with non-EoS source > connectors). For sink connectors, admin client requests to alter / reset > offsets for a consumer group will be rejected if the consumer group is active > (i.e. when the connector tasks come up). For source connectors when exactly > once support is enabled on the worker, we do a round of zombie fencing before > the tasks are brought up and this will basically disable the transactional > producer used to alter offsets (the transactional producer uses the > transactional ID for task 0 of the connector). However, for source connectors > when exactly once support is not enabled on the worker (this is the default), > there are no such safeguards. We could potentially add some interruption > logic that cancels ongoing alter / reset offset requests when a connector is > resumed. -- This message was sent by Atlassian Jira (v8.20.10#820010)