[jira] [Resolved] (KAFKA-16068) Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors

2024-07-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16068.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin 
> scanning errors
> ---
>
> Key: KAFKA-16068
> URL: https://issues.apache.org/jira/browse/KAFKA-16068
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: newbie++
> Fix For: 3.9.0
>
>
> The ConnectorValidationIntegrationTest creates test plugins, some with 
> erroneous behavior. In particular:
>  
> {noformat}
> [2023-12-29 10:28:06,548] ERROR Failed to discover Converter in classpath: 
> Unable to instantiate TestConverterWithPrivateConstructor: Plugin class 
> default constructor must be public 
> (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) 
> [2023-12-29 10:28:06,550]
> ERROR Failed to discover Converter in classpath: Unable to instantiate 
> TestConverterWithConstructorThatThrowsException: Failed to invoke plugin 
> constructor (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138)
> java.lang.reflect.InvocationTargetException{noformat}
> These plugins should be eliminated from the classpath, so that the errors do 
> not appear in unrelated tests. Instead, plugins with erroneous behavior 
> should only be present in the TestPlugins, so that tests can opt-in to 
> loading them.
> There are already plugins with private constructors and 
> throwing-exceptions-constructors, so they should be able to be re-used.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17105) Unnecessary connector restarts after being newly created

2024-07-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-17105.
---
Resolution: Fixed

> Unnecessary connector restarts after being newly created
> 
>
> Key: KAFKA-17105
> URL: https://issues.apache.org/jira/browse/KAFKA-17105
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 
> 3.7.0, 3.6.1, 3.6.2, 3.8.0, 3.7.1, 3.9.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.9.0
>
>
> When a connector is created, it may be restarted unnecessarily immediately 
> after it is first started by the worker to which it has been assigned:
>  # Connector config is written to the config topic
>  # A worker reads the new record from the config topic, and adds the 
> connector to its connectorConfigUpdates field (see 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2445])
>  # Another worker has already seen the new connector config and triggered a 
> rebalance; this worker participates in the ensuing rebalance (see 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L419])
>  and is assigned the connector
>  # After the rebalance is over (see 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L422]),
>  the worker starts all of the connectors it has been newly-assigned (see 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1843]
>  and 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1963-L2001])
>  # Once finished with that, the worker checks for new connector configs (see 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L535])
>  and restarts all connectors in the connectorConfigUpdates field (see 
> [here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L717-L726]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17155) Redundant rebalances triggered after connector creation/deletion and task config updates

2024-07-17 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17155:
-

 Summary: Redundant rebalances triggered after connector 
creation/deletion and task config updates
 Key: KAFKA-17155
 URL: https://issues.apache.org/jira/browse/KAFKA-17155
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.8.0, 3.9.0
Reporter: Chris Egerton


With KAFKA-17105, a scenario is described where a connector may be 
unnecessarily restarted soon after it has been created.

Similarly, when any events occur that set the 
[DistributedHerder.needsReconfigRebalance 
flag|https://github.com/apache/kafka/blob/a66a59f427b30611175fd029d86832d00aa5aabd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L215]
 to true (at the time of writing these are the detection of a new connector, 
the removal of an existing connector, or the detection of new task 
configurations regardless of whether existing configurations existed for the 
connector), it is possible that a rebalance has already started because another 
worker has detected this change as well. In that case, 
{{needsReconfigRebalance}} will still be set to {{true}} even after that 
rebalance has taken place, and the worker will force an unnecessary second 
rebalance.

We might consider changing the "needs reconfig rebalance" field into a 
"reconfig rebalance threshold" field, which contains the latest offset of a 
record consumed from the config topic that warrants a rebalance. When possibly 
performing rebalances based on this field, the worker can check if the offset 
in the assignment given out by the leader during the most recent rebalance is 
greater than or equal to this threshold, and if so, choose not to force a 
rebalance.

 

This has been caused issues in some tests, but may be a benign race condition 
that does not have practical consequences in the real world. We may not want to 
address this (especially with an approach that increases the complexity of the 
code base and comes with risk of regression) until/unless someone states that 
it's affected them outside of Kafka Connect unit tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16383) fix flaky test IdentityReplicationIntegrationTest.testReplicateFromLatest()

2024-07-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16383.
---
Resolution: Fixed

> fix flaky test IdentityReplicationIntegrationTest.testReplicateFromLatest()
> ---
>
> Key: KAFKA-16383
> URL: https://issues.apache.org/jira/browse/KAFKA-16383
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Reporter: Johnny Hsu
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky-test
>
> Build link: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/4/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/]
>  
> This test failed in build in several PR, which is flaky



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17145) Reinstate Utils.join() method in org.apache.kafka.common.utils

2024-07-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-17145.
---
Resolution: Won't Fix

> Reinstate Utils.join() method in org.apache.kafka.common.utils
> --
>
> Key: KAFKA-17145
> URL: https://issues.apache.org/jira/browse/KAFKA-17145
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.8.0
>Reporter: Vikas Balani
>Assignee: Vikas Balani
>Priority: Major
>
> h3. Description:
> The Utils.join() function has been removed from the 
> org.apache.kafka.common.utils package. This utility function is widely used 
> by many connector plugins, which are now failing due to the absence of this 
> method.
> h3. Example of affected code:
> The S3 Sink Connector uses this method here: 
> [link|https://github.com/confluentinc/kafka-connect-storage-cloud/blob/c90780dd61c6b1e11cd89c20619ac81a99aa19d5/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java#L1087]
> h3. Impact:
> 1. Connector plugins that depend on this method are failing.
> 2. This change breaks backward compatibility for plugins and custom code 
> relying on Utils.join().
> h3. Proposed Solution:
> 1. Partially revert https://github.com/apache/kafka/pull/15823
> 2. Restore the Utils.join() method in org.apache.kafka.common.utils.
> 3. Maintain the usage of JDK API within Apache Kafka codebase instead of 
> Utils.join().
> 4. Add a deprecation warning to Utils.join() to indicate future removal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2024-07-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14401.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.9.0
>
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to fail all such offset read requests. That is because 
> even if we restart the thread, chances are it will still fail with the same 
> error so the offset fetch would be stuck perennially.
> As already explained, this scenario happens mainly when the exception thrown 
> is such that it isn't caught by any of the catch blocks and the control lands 
> up in the outermost catch block. In my experience, I have seen this situation 
> happening on a few occasions, when the exception thrown is:
>  
>  
> {code:java}
> [2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog 
> Work Thread - connect-offsets,5,main] 
> (org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
>  Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
>   at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)  
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
>   
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
>   
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75) 
>  
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> 

[jira] [Created] (KAFKA-17130) Connect workers do not properly ensure group membership before responding to health checks

2024-07-12 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17130:
-

 Summary: Connect workers do not properly ensure group membership 
before responding to health checks
 Key: KAFKA-17130
 URL: https://issues.apache.org/jira/browse/KAFKA-17130
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.8.0, 3.9.0
Reporter: Chris Egerton


Initially reported [here|https://github.com/apache/kafka/pull/16585].

When a distributed Connect worker's herder begins an iteration of its tick 
loop, it tries to ensure that the worker is still in contact with the Kafka 
cluster that's used for cluster coordination and internal topics; see 
[here|https://github.com/apache/kafka/blob/0ada8fac6869cad8ac33a79032cf5d57bfa2a3ea/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L419].

However, this method may return even if the Kafka cluster is down. It does not 
force a heartbeat request to be sent to the broker, and may return if the time 
since the last heartbeat is small enough.

We may want to force at least one request (possibly, specifically a heartbeat) 
to the group coordinator to have been sent before returning from 
{{WorkerGroupMember::ensureActive}} in order to guarantee that the health check 
point only returns 200 if it has explicitly validated the health of the 
worker's connection to the group coordinator after the request to the endpoint 
was initiated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17115) Closing newly-created consumers during rebalance can cause rebalances to hang

2024-07-10 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17115:
-

 Summary: Closing newly-created consumers during rebalance can 
cause rebalances to hang
 Key: KAFKA-17115
 URL: https://issues.apache.org/jira/browse/KAFKA-17115
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.9.0
Reporter: Chris Egerton
Assignee: Chris Egerton


When a dynamic consumer (i.e., one with no group instance ID configured) first 
tries to join a group, the group coordinator normally responds with the 
MEMBER_ID_REQUIRED error, under the assumption that the member will retry soon 
after. During this step, the group coordinator will also generate a new member 
ID for the consumer, include it in the error response for the initial join 
group request, and expect that a member with that ID will participate in future 
rebalances.

If a consumer is closed in between the time that it sends the JoinGroup request 
and the time that it receives the response from the group coordinator, it will 
not attempt to leave the group, since it doesn't have a member ID to include in 
that request.

This will cause future rebalances to hang, since the group coordinator will 
still expect a member with the ID for the now-closed consumer to join. 
Eventually, the group coordinator may remove the closed consumer from the 
group, but with default configuration settings, this could take as long as five 
minutes.

One possible fix is to send a LeaveGroup response with the member ID if the 
consumer receives a JoinGroup response with a member ID after it has been 
closed.

 

This applies to the legacy consumer; I have not verified yet with the new async 
consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17105) Unnecessary connector restarts after being newly created

2024-07-09 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17105:
-

 Summary: Unnecessary connector restarts after being newly created
 Key: KAFKA-17105
 URL: https://issues.apache.org/jira/browse/KAFKA-17105
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.7.1, 3.6.2, 3.6.1, 3.7.0, 3.5.2, 3.5.1, 3.6.0, 3.4.1, 
3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 
3.2.0, 3.0.1, 3.0.0, 3.1.0, 3.8.0, 3.9.0
Reporter: Chris Egerton


When a connector is created, it may be restarted unnecessarily immediately 
after it is first started by the worker to which it has been assigned:
 # Connector config is written to the config topic
 # A worker reads the new record from the config topic, and adds the connector 
to its connectorConfigUpdates field (see 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2445])
 # The worker participates in the ensuing rebalance (see 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L419])
 and is assigned the connector
 # After the rebalance is over (see 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L422]),
 the worker starts all of the connectors it has been newly-assigned (see 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1843]
 and 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1963-L2001])
 # Once finished with that, the worker checks for new connector configs (see 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L535])
 and restarts all connectors in the connectorConfigUpdates field (see 
[here|https://github.com/apache/kafka/blob/43676f7612b2155ecada54c61b129d996f58bae2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L717-L726]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17075) Use health check endpoint to verify Connect worker readiness in system tests

2024-07-03 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-17075:
-

 Summary: Use health check endpoint to verify Connect worker 
readiness in system tests
 Key: KAFKA-17075
 URL: https://issues.apache.org/jira/browse/KAFKA-17075
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Affects Versions: 3.9.0
Reporter: Chris Egerton
Assignee: Chris Egerton


We introduced a health check endpoint for Kafka Connect as part of work on 
KAFKA-10816. We should start to use that endpoint to verify worker readiness in 
our system tests, instead of scanning worker logs for specific messages or 
hitting other, less-reliable REST endpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10816) Connect REST API should have a resource that can be used as a readiness probe

2024-07-03 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-10816.
---
Fix Version/s: 3.9.0
   Resolution: Done

> Connect REST API should have a resource that can be used as a readiness probe
> -
>
> Key: KAFKA-10816
> URL: https://issues.apache.org/jira/browse/KAFKA-10816
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> There are a few ways to accurately detect whether a Connect worker is 
> *completely* ready to process all REST requests:
> # Wait for {{Herder started}} in the Connect worker logs
> # Use the REST API to issue a request that will be completed only after the 
> herder has started, such as {{GET /connectors/{name}/}} or {{GET 
> /connectors/{name}/status}}.
> Other techniques can be used to detect other startup states, though none of 
> these will guarantee that the worker has indeed completely started up and can 
> process all REST requests:
> * {{GET /}} can be used to know when the REST server has started, but this 
> may be before the worker has started completely and successfully.
> * {{GET /connectors}} can be used to know when the REST server has started, 
> but this may be before the worker has started completely and successfully. 
> And, for the distributed Connect worker, this may actually return an older 
> list of connectors if the worker hasn't yet completely read through the 
> internal config topic. It's also possible that this request returns even if 
> the worker is having trouble reading from the internal config topic.
> * {{GET /connector-plugins}} can be used to know when the REST server has 
> started, but this may be before the worker has started completely and 
> successfully.
> The Connect REST API should have an endpoint that more obviously and more 
> simply can be used as a readiness probe. This could be a new resource (e.g., 
> {{GET /status}}), though this would only work on newer Connect runtimes, and 
> existing tooling, installations, and examples would have to be modified to 
> take advantage of this feature (if it exists). 
> Alternatively, we could make sure that the existing resources (e.g., {{GET 
> /}} or {{GET /connectors}}) wait for the herder to start completely; this 
> wouldn't require a KIP and it would not require clients use different 
> technique for newer and older Connect runtimes. (Whether or not we back port 
> this is another question altogether, since it's debatable whether the 
> behavior of the existing REST resources is truly a bug.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-07-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-15524:
---

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> 

[jira] [Resolved] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-07-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15524.
---
Resolution: Later

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> 

[jira] [Resolved] (KAFKA-16949) System test test_dynamic_logging in connect_distributed_test is failing

2024-06-25 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16949.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> System test test_dynamic_logging in connect_distributed_test is failing
> ---
>
> Key: KAFKA-16949
> URL: https://issues.apache.org/jira/browse/KAFKA-16949
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.9.0
>
>
> Noticed that the system test `test_dynamic_logging` in 
> `connect_distributed_test` is failing with the following error:
>  
> {code:java}
> [INFO  - 2024-05-08 21:11:06,638 - runner_client - log - lineno:310]: 
> RunnerClient: 
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_dynamic_logging:
>  FAIL: AssertionError()
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
>  line 500, in test_dynamic_logging
> assert self._loggers_are_set(new_level, request_time, namespace, 
> workers=[worker])
> AssertionError {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-16935:
---

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16935.
---
Resolution: Fixed

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16943) Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest

2024-06-12 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16943:
-

 Summary: Synchronously verify Connect worker startup failure in 
InternalTopicsIntegrationTest
 Key: KAFKA-16943
 URL: https://issues.apache.org/jira/browse/KAFKA-16943
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Chris Egerton


Created after PR discussion 
[here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].

In some of our integration tests, we want to verify that a Connect worker 
cannot start under poor conditions (such as when its internal topics do not yet 
exist and it is configured to create them with a higher replication factor than 
the number of available brokers, or when its internal topics already exist but 
they do not have the compaction cleanup policy).

This is currently not possible, and presents a possible gap in testing 
coverage, especially for the test cases 
{{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and 
{{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we 
could have some way of synchronously awaiting the completion or failure of 
worker startup in our integration tests in order to guarantee that worker 
startup fails under sufficiently adverse conditions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-11 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16935:
-

 Summary: Automatically wait for cluster startup in embedded 
Connect integration tests
 Key: KAFKA-16935
 URL: https://issues.apache.org/jira/browse/KAFKA-16935
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


It's a common idiom in our integration tests to [start an embedded Kafka and 
Connect 
cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
 and then immediately afterwards [wait for each worker in the Connect cluster 
to complete 
startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
 Separating these two actions into separate steps makes our tests lengthier and 
can even lead to bugs and flakiness if the second step is accidentally omitted 
(see [https://github.com/apache/kafka/pull/16286] for one example).

Instead, we should default to automatically awaiting the complete startup of 
every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} is 
invoked, and require callers to opt out if they do not want to automatically 
wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-9228.
--
Fix Version/s: 3.9.0
   Resolution: Fixed

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16838.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to 

[jira] [Resolved] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16837.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Resolved] (KAFKA-16844) ByteArrayConverter can't convert ByteBuffer

2024-05-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16844.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> ByteArrayConverter can't convert ByteBuffer
> ---
>
> Key: KAFKA-16844
> URL: https://issues.apache.org/jira/browse/KAFKA-16844
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Fan Yang
>Assignee: Fan Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> In current Schema design, schema type Bytes correspond to two kinds of 
> classes, byte[] and ByteBuffer. But current ByteArrayConverter can only 
> convert byte[]. My suggestion is to add ByteBuffer support in current 
> ByteArrayConverter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16603.
---
Resolution: Not A Bug

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
> Kafka producer with timeoutMillis = 0 ms.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
> force close the producer since pending requests could not be completed within 
> timeout 0 ms.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
> shutdown of Kafka producer I/O thread, sending remaining records.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
> incomplete batches due to forced shutdown
> Apr 22, 2024 @ 15:56:24.113 
> WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
> 

[jira] [Resolved] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16656.
---
Resolution: Not A Bug

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-05-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16108.
---
Resolution: Done

> Backport fix for KAFKA-16093 to 3.7
> ---
>
> Key: KAFKA-16108
> URL: https://issues.apache.org/jira/browse/KAFKA-16108
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.7.1
>
>
> A fix for KAFKA-16093 is present on the branches trunk (the version for which 
> is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
> release, and this issue is not a blocker, so it cannot be backported right 
> now.
> We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
> released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15018.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.8.0
>
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13329) Connect does not perform preflight validation for per-connector key and value converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13329.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector key and value 
> converters
> 
>
> Key: KAFKA-13329
> URL: https://issues.apache.org/jira/browse/KAFKA-13329
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a key and/or value converter class for their connector 
> directly in the configuration for that connector. If this occurs, no 
> preflight validation is performed to ensure that the specified converter is 
> valid.
> Unfortunately, the [Converter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java]
>  does not require converters to expose a {{ConfigDef}} (unlike the 
> [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  which does have that requirement), so it's unlikely that the configuration 
> properties of the converter itself can be validated.
> However, we can and should still validate that the converter class exists, 
> can be instantiated (i.e., has a public, no-args constructor and is a 
> concrete, non-abstract class), and implements the {{Converter}} interface.
> *EDIT:* Since this ticket was originally filed, a {{Converter::config}} 
> method was added in 
> [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions].
>  We can now utilize that config definition during preflight validation for 
> connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13328) Connect does not perform preflight validation for per-connector header converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13328.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector header 
> converters
> -
>
> Key: KAFKA-13328
> URL: https://issues.apache.org/jira/browse/KAFKA-13328
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a header converter class for their connector directly in 
> the configuration for that connector. If this occurs, no preflight validation 
> is performed to ensure that the specified converter is valid.
> {{HeaderConverter}} implementations are required to provide a valid 
> {{ConfigDef}} to the Connect framework via 
> [HeaderConverter::config|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  but this object isn't actually leveraged anywhere by Connect.
> Connect should make use of this config object during preflight validation for 
> connectors to fail faster when their header converters are misconfigured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16423) Ignoring offset partition key with an unexpected format for the first element in the partition key list. Expected type: java.lang.String, actual type: null"

2024-03-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16423.
---
Fix Version/s: (was: 3.6.2)
   (was: 3.8.0)
   (was: 3.7.1)
   Resolution: Duplicate

> Ignoring offset partition key with an unexpected format for the first element 
> in the partition key list. Expected type: java.lang.String, actual type: null"
> 
>
> Key: KAFKA-16423
> URL: https://issues.apache.org/jira/browse/KAFKA-16423
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.0, 3.6.0, 3.5.1, 3.5.2, 3.7.0, 3.6.1, 3.8.0
>Reporter: johndoe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16423) Ignoring offset partition key with an unexpected format for the first element in the partition key list. Expected type: java.lang.String, actual type: null"

2024-03-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16423.
---
Resolution: Duplicate

> Ignoring offset partition key with an unexpected format for the first element 
> in the partition key list. Expected type: java.lang.String, actual type: null"
> 
>
> Key: KAFKA-16423
> URL: https://issues.apache.org/jira/browse/KAFKA-16423
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.0, 3.6.0, 3.5.1, 3.5.2, 3.7.0, 3.6.1, 3.8.0
>Reporter: johndoe
>Assignee: johndoe
>Priority: Minor
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16423) Ignoring offset partition key with an unexpected format for the first element in the partition key list. Expected type: java.lang.String, actual type: null"

2024-03-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-16423:
---
  Assignee: (was: johndoe)

> Ignoring offset partition key with an unexpected format for the first element 
> in the partition key list. Expected type: java.lang.String, actual type: null"
> 
>
> Key: KAFKA-16423
> URL: https://issues.apache.org/jira/browse/KAFKA-16423
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.0, 3.6.0, 3.5.1, 3.5.2, 3.7.0, 3.6.1, 3.8.0
>Reporter: johndoe
>Priority: Minor
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16392) Spurious log warnings: "Ignoring offset partition key with an unexpected format for the second element in the partition key list. Expected type: java.util.Map, actual ty

2024-03-20 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16392:
-

 Summary: Spurious log warnings: "Ignoring offset partition key 
with an unexpected format for the second element in the partition key list. 
Expected type: java.util.Map, actual type: null"
 Key: KAFKA-16392
 URL: https://issues.apache.org/jira/browse/KAFKA-16392
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.6.1, 3.7.0, 3.5.2, 3.5.1, 3.6.0, 3.5.0, 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Some source connectors choose not to specify source offsets with the records 
they emit (or rather, to provide null partitions/offsets). When these 
partitions are parsed by a Kafka Connect worker, this currently leads to a 
spurious warning log message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2024-02-01 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15575.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> To enforce this constraint, we could begin dropping configs that exceed the 
> limit, and log a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-01-10 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16108:
-

 Summary: Backport fix for KAFKA-16093 to 3.7
 Key: KAFKA-16108
 URL: https://issues.apache.org/jira/browse/KAFKA-16108
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Chris Egerton
Assignee: Chris Egerton
 Fix For: 3.7.1


A fix for KAFKA-16093 is present on the branches trunk (the version for which 
is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
release, and this issue is not a blocker, so it cannot be backported right now.

We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces

2024-01-08 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16093:
-

 Summary: Spurious warnings logged to stderr about empty path 
annotations and providers not implementing provider interfaces
 Key: KAFKA-16093
 URL: https://issues.apache.org/jira/browse/KAFKA-16093
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Affects Versions: 3.7.0, 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Some warnings get logged to stderr on Connect startup. For example:
{quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
SERVER runtime does not implement any provider interfaces applicable in the 
SERVER runtime. Due to constraint configuration problems the provider 
org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
in SERVER runtime does not implement any provider interfaces applicable in the 
SERVER runtime. Due to constraint configuration problems the provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource 
registered in SERVER runtime does not implement any provider interfaces 
applicable in the SERVER runtime. Due to constraint configuration problems the 
provider 
org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will be 
ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
registered in SERVER runtime does not implement any provider interfaces 
applicable in the SERVER runtime. Due to constraint configuration problems the 
provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
be ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
SERVER runtime does not implement any provider interfaces applicable in the 
SERVER runtime. Due to constraint configuration problems the provider 
org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
ignored. 

Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors

WARNING: The following warnings have been detected: WARNING: The (sub)resource 
method listLoggers in 
org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty 
path annotation.

WARNING: The (sub)resource method listConnectors in 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
empty path annotation.

WARNING: The (sub)resource method createConnector in 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
empty path annotation.

WARNING: The (sub)resource method listConnectorPlugins in 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
contains empty path annotation.

WARNING: The (sub)resource method serverInfo in 
org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
path annotation.
{quote}
These are benign, but can confuse and even frighten new users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15563) Provide informative error messages when Connect REST requests time out

2023-12-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15563.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Provide informative error messages when Connect REST requests time out
> --
>
> Key: KAFKA-15563
> URL: https://issues.apache.org/jira/browse/KAFKA-15563
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.7.0
>
>
> The Kafka Connect REST API has a hardcoded timeout of 90 seconds. If any 
> operations take longer than that, a 500 error response is returned with the 
> message "Request timed out" (see 
> [here|https://github.com/apache/kafka/blob/7e1c453af9533aba8c19da2d08ce6595c1441fc0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java#L70]).
> This can be a source of frustration for users, who want to understand what is 
> causing the request to time out. This can be specific to the request (for 
> example, a connector's [custom multi-property validation 
> logic|https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/connector/Connector.html#validate(java.util.Map)]
>  is taking too long), or applicable to any request that goes through the 
> herder's tick thread (for which there are a variety of possible causes).
> We can give users better, immediate insight into what is causing requests to 
> time out by including information about the last possibly-blocking operation 
> the worker performed while servicing the request (or attempting to enter a 
> state where all preconditions necessary to service the request have been 
> satisfied), and when the worker began that operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15988) Kafka Connect OffsetsApiIntegrationTest takes too long

2023-12-07 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15988:
-

 Summary: Kafka Connect OffsetsApiIntegrationTest takes too long
 Key: KAFKA-15988
 URL: https://issues.apache.org/jira/browse/KAFKA-15988
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


The [OffsetsApiIntegrationTest 
suite|https://github.com/apache/kafka/blob/c515bf51f820f26ff6be6b0fde03b47b69a10b00/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java]
 currently contains 27 test cases. Each test case begins by creating embedded 
Kafka and Kafka Connect clusters, which is fairly resource-intensive and 
time-consuming.

If possible, we should reuse those embedded clusters across test cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15821) Active topics for deleted connectors are not reset in standalone mode

2023-11-13 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15821:
-

 Summary: Active topics for deleted connectors are not reset in 
standalone mode
 Key: KAFKA-15821
 URL: https://issues.apache.org/jira/browse/KAFKA-15821
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 
3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 
2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.5.0, 
3.7.0
Reporter: Chris Egerton


In 
[KIP-558|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect],
 a new REST endpoint was added to report the set of active topics for a 
connector. The KIP specified that "Deleting a connector will reset this 
connector's set of active topics", and this logic was successfully implemented 
in distributed mode. However, in standalone mode, active topics for deleted 
connectors are not deleted, and if a connector is re-created, it will inherit 
the active topics of its predecessor(s) unless they were manually reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-11-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15680.
---
Fix Version/s: 3.7.0
   (was: 3.6.1)
   Resolution: Fixed

> Partition-Count is not getting updated Correctly in the Incremental 
> Co-operative Rebalancing(ICR) Mode of Rebalancing
> -
>
> Key: KAFKA-15680
> URL: https://issues.apache.org/jira/browse/KAFKA-15680
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Assignee: Pritam Kumar
>Priority: Minor
> Fix For: 3.7.0
>
>
> * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, 
> say Worker 3 joins, a new global assignment is computed by the leader, say 
> Worker1, that results in the revocation of some tasks from each existing 
> worker i.e Worker1 and Worker2.
>  * Once the new member join is completed, 
> *ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
> computes all the new partitions assigned and the partitions which are revoked 
> and updates the subscription Object.
>  * If it was the case of revocation which we check by checking the 
> “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
> which internally calls “updatePartitionCount()” which fetches partition from 
> the *assignment* object which is yet not updated by the new assignment.
>  * It is only just before calling the “{*}invokePartitionsAssigned{*}()” 
> method that we update the *assignment* by invoking the following → 
> *subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15787) Investigate new test case failure - testReplicateSourceDefault - MirrorConnectorsIntegrationBaseTest

2023-11-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15787.
---
Resolution: Duplicate

> Investigate new test case failure - testReplicateSourceDefault - 
> MirrorConnectorsIntegrationBaseTest
> 
>
> Key: KAFKA-15787
> URL: https://issues.apache.org/jira/browse/KAFKA-15787
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15761) ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky

2023-10-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15761.
---
Resolution: Duplicate

> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is 
> flaky
> ---
>
> Key: KAFKA-15761
> URL: https://issues.apache.org/jira/browse/KAFKA-15761
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Calvin Liu
>Priority: Major
>
> Build / JDK 21 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 12ms  
> at org.junit.Assert.fail(Assert.java:89)at 
> org.junit.Assert.assertTrue(Assert.java:42)  at 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.runningConnectorAndTasksRestart(ConnectorRestartApiIntegrationTest.java:273)
>  at 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector(ConnectorRestartApiIntegrationTest.java:231)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
>at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)  
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15693) Disabling scheduled rebalance delay in Connect can lead to indefinitely unassigned connectors and tasks

2023-10-26 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15693:
-

 Summary: Disabling scheduled rebalance delay in Connect can lead 
to indefinitely unassigned connectors and tasks
 Key: KAFKA-15693
 URL: https://issues.apache.org/jira/browse/KAFKA-15693
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 
3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 
2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.4.1, 
2.5.0, 2.3.1, 2.4.0, 2.3.0, 3.7.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Kafka Connect supports deferred resolution of imbalances when using the 
incremental rebalancing algorithm introduced in 
[KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 When enabled, this feature introduces a configurable delay period between when 
"lost" assignments (i.e., connectors and tasks that were assigned to a worker 
in the previous round of rebalance but are not assigned to a worker during the 
current round of rebalance) are detected and when they are reassigned to a 
worker. The delay can be configured with the 
{{scheduled.rebalance.max.delay.ms}} property.

If this property is set to 0, then there should be no delay between when lost 
assignments are detected and when they are reassigned. Instead, however, this 
configuration can cause lost assignments to be withheld during a rebalance, 
remaining unassigned until the next rebalance, which, because scheduled delays 
are disabled, will not happen on its own and will only take place when 
unrelated conditions warrant it (such as the creation or deletion of a 
connector, a worker joining or leaving the cluster, new task configs being 
generated for a connector, etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when Kafka cluster bounces

2023-10-24 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15676:
-

 Summary: Scheduled rebalance delay for Connect is unnecessarily 
triggered when Kafka cluster bounces
 Key: KAFKA-15676
 URL: https://issues.apache.org/jira/browse/KAFKA-15676
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks. For more 
context, see KAFKA-9184.

 

However, this change in state is not relayed the worker's instance of the 
[IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
 This has the consequence that, if the group coordinator for a Connect cluster 
is unavailable for long enough, all of the workers in cluster will revoke their 
assigned connectors and tasks, report that they have been assigned no 
connectors and tasks during the next rebalance, and spuriously trigger the 
scheduled rebalance delay (since the leader will assume that all workers should 
still be running the connectors and tasks that it assigned during the last 
rebalance).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15428) Cluster-wide dynamic log adjustments for Connect

2023-10-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15428.
---
Fix Version/s: 3.7.0
   Resolution: Done

> Cluster-wide dynamic log adjustments for Connect
> 
>
> Key: KAFKA-15428
> URL: https://issues.apache.org/jira/browse/KAFKA-15428
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> [KIP-495|https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect]
>  added REST APIs to view and adjust the logging levels of Kafka Connect 
> workers at runtime. This has been tremendously valuable (thank you 
> [~wicknicks]!), but one frequently-observed area for improvement is that the 
> API requires a REST request to be issued to each to-be-adjusted worker.
> If possible, we should add support for adjusting the logging level of all 
> workers in a cluster with a single REST request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15570) Add unit tests for MemoryConfigBackingStore

2023-10-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15570.
---
Resolution: Done

> Add unit tests for MemoryConfigBackingStore
> ---
>
> Key: KAFKA-15570
> URL: https://issues.apache.org/jira/browse/KAFKA-15570
> Project: Kafka
>  Issue Type: Test
>  Components: connect, KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> Currently, the 
> [MemoryConfigBackingStore|https://github.com/apache/kafka/blob/6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java#L37]
>  class doesn't have any unit tests for its functionality. While most of its 
> functionality is fairly lightweight today, changes will be introduced with 
> [KIP-980|https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state]
>  (potentially 
> [KIP-976|https://cwiki.apache.org/confluence/display/KAFKA/KIP-976%3A+Cluster-wide+dynamic+log+adjustment+for+Kafka+Connect]
>  as well) and it would be good to have a test setup in place before those 
> changes are made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15570) Add unit tests for MemoryConfigBackingStore

2023-10-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-15570:
---

> Add unit tests for MemoryConfigBackingStore
> ---
>
> Key: KAFKA-15570
> URL: https://issues.apache.org/jira/browse/KAFKA-15570
> Project: Kafka
>  Issue Type: Test
>  Components: connect, KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> Currently, the 
> [MemoryConfigBackingStore|https://github.com/apache/kafka/blob/6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java#L37]
>  class doesn't have any unit tests for its functionality. While most of its 
> functionality is fairly lightweight today, changes will be introduced with 
> [KIP-980|https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state]
>  (potentially 
> [KIP-976|https://cwiki.apache.org/confluence/display/KAFKA/KIP-976%3A+Cluster-wide+dynamic+log+adjustment+for+Kafka+Connect]
>  as well) and it would be good to have a test setup in place before those 
> changes are made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15249) Verify Connect test-plugins artifact is published to Maven Central

2023-10-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15249.
---
Resolution: Done

> Verify Connect test-plugins artifact is published to Maven Central
> --
>
> Key: KAFKA-15249
> URL: https://issues.apache.org/jira/browse/KAFKA-15249
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>
> In KAFKA-14759 we created a separate {{connect/test-plugins}} module to store 
> all testing-only Connect plugins and removed those plugins from existing 
> Connect modules.
> These testing-only plugins are intentionally excluded from the project's 
> release file (which can be generated with {{{}./gradlew releaseTarGz{}}}) 
> however, some users may still be relying on them for testing environments.
> Although we should refrain from distributing these testing-only plugins with 
> our out-of-the-box distribution of Connect, we should still ensure that 
> they're available on an opt-in basis to users who would like to continue 
> using them. This can be accomplished by publishing them to [Maven 
> Central|https://search.maven.org/], like we do with our other modules.
> This will probably happen automatically during the next release (3.6.0) with 
> no further action required. This ticket is just here as a reminder to verify 
> that the artifacts are present in the staging Maven repo when release 
> candidates are published for voting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15563) Provide informative error messages when Connect REST requests time out

2023-10-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15563:
-

 Summary: Provide informative error messages when Connect REST 
requests time out
 Key: KAFKA-15563
 URL: https://issues.apache.org/jira/browse/KAFKA-15563
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


The Kafka Connect REST API has a hardcoded timeout of 90 seconds. If any 
operations take longer than that, a 500 error response is returned with the 
message "Request timed out" (see 
[here|https://github.com/apache/kafka/blob/7e1c453af9533aba8c19da2d08ce6595c1441fc0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java#L70]).

This can be a source of frustration for users, who want to understand what is 
causing the request to time out. This can be specific to the request (for 
example, a connector's [custom multi-property validation 
logic|https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/connector/Connector.html#validate(java.util.Map)]
 is taking too long), or applicable to any request that goes through the 
herder's tick thread (for which there are a variety of possible causes).

We can give users better, immediate insight into what is causing requests to 
time out by including information about the last possibly-blocking operation 
the worker performed while servicing the request (or attempting to enter a 
state where all preconditions necessary to service the request have been 
satisfied), and when the worker began that operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15473) Connect connector-plugins endpoint shows duplicate plugins

2023-09-19 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15473.
---
Fix Version/s: 3.7.0
   (was: 3.6.0)
   Resolution: Fixed

> Connect connector-plugins endpoint shows duplicate plugins
> --
>
> Key: KAFKA-15473
> URL: https://issues.apache.org/jira/browse/KAFKA-15473
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.7.0
>
>
> In <3.6.0-rc0, duplicates of a plugin would be shown if it subclassed 
> multiple interfaces. For example:
> {noformat}
>   {
> "class": "org.apache.kafka.connect.storage.StringConverter",
> "type": "converter"
>   },
>   { 
> "class": "org.apache.kafka.connect.storage.StringConverter",
> "type": "converter"
>   },{noformat}
> In 3.6.0-rc0, there are many more listings for the same plugin. For example:
> {noformat}
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter",
>     "version": "3.6.0"
>   },{noformat}
> These duplicates appear to happen when a plugin with the same class name 
> appears in multiple locations/classloaders.
> When interpreting a connector configuration, only one of these plugins will 
> be chosen, so only one is relevant to show to users. The REST API should only 
> display the plugins which are eligible to be loaded, and hide the duplicates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15416) Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound

2023-09-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15416.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound
> --
>
> Key: KAFKA-15416
> URL: https://issues.apache.org/jira/browse/KAFKA-15416
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.7.0
>
>
> This test fails frequently when I run unit tests locally, but I've never seen 
> it fail during a CI build.
> Failure message:
> {quote}    org.apache.kafka.connect.errors.ConnectException: Failed to list 
> offsets for topic partitions.
>         at 
> app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:777)
>         at 
> app//org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)
>  
>         Caused by:
>         org.apache.kafka.connect.errors.ConnectException: Fail to list 
> offsets for topic partitions after 1 attempts.  Reason: Timed out while 
> waiting to get end offsets for topic 'myTopic' on brokers at 
> \{retry.backoff.ms=0}
>             at 
> app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:106)
>             at 
> app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)
>             at 
> app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)
>             ... 1 more
>  
>             Caused by:
>             org.apache.kafka.common.errors.TimeoutException: Timed out while 
> waiting to get end offsets for topic 'myTopic' on brokers at 
> \{retry.backoff.ms=0}
>  
>                 Caused by:
>                 java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send 
> the call. Call: listOffsets(api=METADATA)
>                     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>                     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>                     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:716)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.lambda$retryEndOffsets$7(TopicAdmin.java:769)
>                     at 
> org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:87)
>                     at 
> org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)
>                     at 
> org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)
>  
>                     Caused by:
>                     org.apache.kafka.common.errors.TimeoutException: Timed 
> out waiting to send the call. Call: listOffsets(api=METADATA)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15436) Custom ConfigDef validators are invoked with null when user-provided value does not match type

2023-09-05 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15436:
-

 Summary: Custom ConfigDef validators are invoked with null when 
user-provided value does not match type
 Key: KAFKA-15436
 URL: https://issues.apache.org/jira/browse/KAFKA-15436
 Project: Kafka
  Issue Type: Bug
Reporter: Chris Egerton


Filed in response to [discussion on a tangentially-related 
PR|https://github.com/apache/kafka/pull/14304#discussion_r1310039190].
h3. Background

The [ConfigDef.Validator 
interface|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Validator.html]
 can be used to add custom per-property validation logic to a 
[ConfigDef|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html]
 instance. This can serve many uses, including but not limited to:
 * Ensuring that the value for a string property matches the name of a Java 
enum type
 * Ensuring that the value for an integer property falls within the range of 
valid port numbers
 * Ensuring that the value for a class property has a public, no-args 
constructor and/or implements a certain interface

This validation logic can be invoked directly via 
[ConfigDef::validate|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html#validate(java.util.Map)]
 or 
[ConfigDef::validateAll|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html#validateAll(java.util.Map)],
 or indirectly when instantiating an 
[AbstractConfig|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/AbstractConfig.html].

When a value is validated by a {{ConfigDef}} instance, the {{ConfigDef}} first 
verifies that the value adheres to the expected type. For example, if the "raw" 
value is the string {{"345"}} and the property is defined with the [INT 
type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#INT],
 then the value is valid (it is parsed as the integer {{{}345{}}}). However, if 
the same raw value is used for a property defined with the [BOOLEAN 
type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#BOOLEAN],
 then the value is invalid (it cannot be parsed as a boolean).
h3. Problem

When a raw value is invalid for the type of the property it is used for (e.g., 
{{"345"}} is used for a property defined with the [BOOLEAN 
type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#BOOLEAN]),
 custom validators for the property are still invoked, with a value of 
{{{}null{}}}.

This can lead to some counterintuitive behavior, and may necessitate that 
implementers of the {{ConfigDef.Validator}} interface catch cases where the 
value is {{null}} and choose not to report any errors (with the assumption that 
an error will already be reported by the {{ConfigDef}} regarding its failure to 
parse the raw value with the expected type).

We may consider skipping custom validation altogether when the raw value for a 
property cannot be parsed with the expected type. On the other hand, it's 
unclear if there are compatibility concerns about this kind of change.

If we decide to change this behavior, we should try to assess which code paths 
may lead to custom validators being invoked, which use cases correspond to 
which of these code paths, and whether this behavioral change has a chance to 
negatively impact these use cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15428) Cluster-wide dynamic log adjustments for Connect

2023-09-01 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15428:
-

 Summary: Cluster-wide dynamic log adjustments for Connect
 Key: KAFKA-15428
 URL: https://issues.apache.org/jira/browse/KAFKA-15428
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


[KIP-495|https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect]
 added REST APIs to view and adjust the logging levels of Kafka Connect workers 
at runtime. This has been tremendously valuable (thank you [~wicknicks]!), but 
one frequently-observed area for improvement is that the API requires a REST 
request to be issued to each to-be-adjusted worker.

If possible, we should add support for adjusting the logging level of all 
workers in a cluster with a single REST request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2023-08-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-12879.
---
Resolution: Fixed

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Blocker
> Fix For: 2.5.2, 2.7.3, 2.6.4, 3.0.2, 3.1.1, 3.2.0, 2.8.2
>
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15425) Compatibility break in Admin.listOffsets() (2)

2023-08-31 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15425:
-

 Summary: Compatibility break in Admin.listOffsets() (2)
 Key: KAFKA-15425
 URL: https://issues.apache.org/jira/browse/KAFKA-15425
 Project: Kafka
  Issue Type: Test
  Components: admin
Affects Versions: 3.6.0
Reporter: Chris Egerton
Assignee: Chris Egerton
 Fix For: 3.6.0


The behavioral change that warrants this ticket is identical to the change 
noted in KAFKA-12879, but has a different root cause (KAFKA-14821 instead of 
KAFKA-12339).

In both this ticket and KAFKA-12339, the issue is that calls to 
{{Admin::listOffsets}} will now retry on the [UNKNOWN_TOPIC_OR_PARTITION 
error|https://github.com/apache/kafka/blob/16dc983ad67767ee8debd125a3f8b150a91c7acf/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L165-L166]
 (and possibly eventually throw a {{{}TimeoutException{}}}), whereas before, 
they would fail immediately and throw an 
{{{}UnknownTopicOrPartitionException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2023-08-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-12879:
---
  Assignee: (was: Philip Nee)

Reopening due to https://github.com/apache/kafka/pull/13432

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Priority: Major
> Fix For: 2.5.2, 2.8.2, 3.2.0, 3.1.1, 3.0.2, 2.7.3, 2.6.4
>
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13327) Preflight validations of connectors leads to 500 responses

2023-08-29 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13327.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Preflight validations of connectors leads to 500 responses
> --
>
> Key: KAFKA-13327
> URL: https://issues.apache.org/jira/browse/KAFKA-13327
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.7.0
>
>
> The Connect framework performs some preflight validations for all connectors 
> that are created in addition to allowing connectors to define their own 
> custom validation logic by providing a {{ConfigDef}} object in 
> [Connector::config|https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/connector/Connector.html#config()]
>  and performing multi-property validation in 
> [Connector::validate|https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/connector/Connector.html#validate(java.util.Map)].
> When performed correctly, this validation information is surfaced to the user 
> in the form of a 
> [ConfigInfos|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java]
>  object containing a list of [config 
> objects|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java#L42-L45]
>  whose 
> [values|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java#L42-L45]
>  contain one or more [error 
> messages|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java#L61-L64].
>  This can be used as the response for a REST request to PUT 
> /connector-plugins/\{connectorType}/config/validate and allows programmatic 
> UIs to render error messages for every invalid property to the user.
> However, some validations performed by the Connect framework do not follow 
> this pattern and instead result in a 500 response being returned to the user. 
> For example, logic specific to sink connectors (see 
> [AbstractHerder|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L436]
>  and 
> [SinkConnectorConfig|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L88-L125])
>  simply throws an exception instead of documenting the error with the 
> offending property and returning it in a standard response.
>  
> We should correct this logic wherever possible so that configurations that 
> are not fatally invalid (i.e., may have invalid properties but can still be 
> translated into a meaningful {{ConfigInfos}} response object) do not cause a 
> 500 response to be returned to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15416) Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound

2023-08-29 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15416:
-

 Summary: Flaky test 
TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound
 Key: KAFKA-15416
 URL: https://issues.apache.org/jira/browse/KAFKA-15416
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Chris Egerton


This test fails frequently when I run unit tests locally, but I've never seen 
it fail during a CI build.

Failure message:
{quote}    org.apache.kafka.connect.errors.ConnectException: Failed to list 
offsets for topic partitions.

        at 
app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:777)

        at 
app//org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)

 

        Caused by:

        org.apache.kafka.connect.errors.ConnectException: Fail to list offsets 
for topic partitions after 1 attempts.  Reason: Timed out while waiting to get 
end offsets for topic 'myTopic' on brokers at \{retry.backoff.ms=0}

            at 
app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:106)

            at 
app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)

            at 
app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)

            ... 1 more

 

            Caused by:

            org.apache.kafka.common.errors.TimeoutException: Timed out while 
waiting to get end offsets for topic 'myTopic' on brokers at 
\{retry.backoff.ms=0}

 

                Caused by:

                java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the 
call. Call: listOffsets(api=METADATA)

                    at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

                    at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

                    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)

                    at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:716)

                    at 
org.apache.kafka.connect.util.TopicAdmin.lambda$retryEndOffsets$7(TopicAdmin.java:769)

                    at 
org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:87)

                    at 
org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)

                    at 
org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)

                    at 
org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)

 

                    Caused by:

                    org.apache.kafka.common.errors.TimeoutException: Timed out 
waiting to send the call. Call: listOffsets(api=METADATA)
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15342) Considering upgrading to Mockito 5.4.1 or later

2023-08-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15342:
-

 Summary: Considering upgrading to Mockito 5.4.1 or later
 Key: KAFKA-15342
 URL: https://issues.apache.org/jira/browse/KAFKA-15342
 Project: Kafka
  Issue Type: Task
  Components: unit tests
Reporter: Chris Egerton
Assignee: Chris Egerton
 Fix For: 4.0.0


We're currently stuck on Mockito 4.x.y because the 5.x.y line requires Java 11 
and, until we begin to work on Kafka 4.0.0, we continue to support Java 8.

Either directly before, or after releasing Kafka 4.0.0, we should try to 
upgrade to a version of Mockito on the 5.x.y line.

If we're able to use a version that includes 
[https://github.com/mockito/mockito/pull/3078|https://github.com/mockito/mockito/pull/3078,]
 (which should be included in either a 5.4.1 or 5.5.0 release), we should also 
revert the change made for https://issues.apache.org/jira/browse/KAFKA-14682, 
which is just a temporary workaround.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-08-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14682.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.6.0
>
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the 
> latter instead of the former, which can cause tests to fail due to 
> unnecessary stubbings when being run in that IDE but not when being built on 
> Jenkins.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.
>  
> This exact behavior has been reported elsewhere as a [Gradle 
> issue|https://github.com/gradle/gradle/issues/10694]; based on [discussion on 
> that 
> thread|https://github.com/gradle/gradle/issues/10694#issuecomment-1374911274],
>  it appears this is a known and somewhat-intentional limitation of Mockito:
> {quote}I spent some time trying to solve this and eventually I stumbled upon 
> this piece in Mockito's JUnit runner:
> [https://github.com/mockito/mockito/blob/main/src/main/java/org/mockito/internal/runners/StrictRunner.java#L47-L53]
> // only report when:
> // 1. if all tests from given test have ran (filter requested is false)
> // Otherwise we would report unnecessary stubs even if the user runs just 
> single test
> // from the class
> // 2. tests are successful (we don't want to add an extra failure on top of 
> any existing
> // failure, to avoid confusion)
>  
> (1) suggests that skipping unused stub validation is the intended behavior 
> when the user filters a single test from the class. However, this behavior 
> applies to any type of filter.
> And Gradle indeed applies a {{CategoryFilter}} if categories are configured: 
> [https://github.com/rieske/gradle/blob/e82029abb559d620fdfecb4708a95c6313af625c/subprojects/testing-jvm-infrastructure/src/main/java/org/gradle/api/internal/tasks/testing/junit/JUnitTestClassExecutor.java#L70-L96]
> Which then causes Mockito to not validate unused stubs.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest

2023-08-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13187.
---
Fix Version/s: 3.6.0
   Resolution: Done

> Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
> -
>
> Key: KAFKA-13187
> URL: https://issues.apache.org/jira/browse/KAFKA-13187
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10334) Transactions not working properly

2023-08-05 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-10334.
---
Resolution: Duplicate

> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.
> The configs that I'm using to create the KafkaProducer in order to use 
> transactions:
> {code:java}
> new Properties() {
>   {
> put(BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29092,localhost:29093,localhost:29094")
> put(ACKS_CONFIG, "-1")
> put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
> put(KEY_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[StringSerializer].getName))
> put(VALUE_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[ByteArraySerializer].getName))
> put(CLIENT_ID_CONFIG, "app")
> put(TRANSACTIONAL_ID_CONFIG, "app")
> put(ENABLE_IDEMPOTENCE_CONFIG, "true")
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15249) Verify Connect test-plugins artifact is published to Maven Central

2023-07-25 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15249:
-

 Summary: Verify Connect test-plugins artifact is published to 
Maven Central
 Key: KAFKA-15249
 URL: https://issues.apache.org/jira/browse/KAFKA-15249
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.6.0
Reporter: Chris Egerton
Assignee: Chris Egerton
 Fix For: 3.6.0


In KAFKA-14759 we created a separate {{connect/test-plugins}} module to store 
all testing-only Connect plugins and removed those plugins from existing 
Connect modules.

These testing-only plugins are intentionally excluded from the project's 
release file (which can be generated with {{{}./gradlew releaseTarGz{}}}) 
however, some users may still be relying on them for testing environments.

Although we should refrain from distributing these testing-only plugins with 
our out-of-the-box distribution of Connect, we should still ensure that they're 
available on an opt-in basis to users who would like to continue using them. 
This can be accomplished by publishing them to [Maven 
Central|https://search.maven.org/], like we do with our other modules.

This will probably happen automatically during the next release (3.6.0) with no 
further action required. This ticket is just here as a reminder to verify that 
the artifacts are present in the staging Maven repo when release candidates are 
published for voting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13431) Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

2023-07-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13431.
---
Fix Version/s: 3.6.0
   Resolution: Done

> Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit 
> users)
> ---
>
> Key: KAFKA-13431
> URL: https://issues.apache.org/jira/browse/KAFKA-13431
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Diego Erdody
>Assignee: Yash Mayya
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> There's currently an incompatibility between Sink connectors overriding the 
> {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that 
> mutate the topic field.
> The problem was present since the {{preCommit}} method inception and is 
> rooted in a mismatch between the topic/partition that is passed to 
> {{open/preCommit}} (the original topic and partition before applying any 
> transformations) and the topic partition that is present in the SinkRecord 
> that the {{SinkTask.put}} method receives (after transformations are 
> applied). Since that's all the information the connector has to implement any 
> kind of internal offset tracking, the topic/partitions it can return in 
> preCommit will correspond to the transformed topic, when the framework 
> actually expects it to be the original topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14669.
---
Resolution: Done

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-14669:
---

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14669.
---
Fix Version/s: 3.6.0
   Resolution: Done

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14059) Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest

2023-07-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14059.
---
Resolution: Done

> Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest
> ---
>
> Key: KAFKA-14059
> URL: https://issues.apache.org/jira/browse/KAFKA-14059
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4107) Support offset reset capability in Kafka Connect

2023-06-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-4107.
--
Resolution: Done

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Chris Egerton
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-06-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15091:
-

 Summary: Javadocs for SourceTask::commit are incorrect
 Key: KAFKA-15091
 URL: https://issues.apache.org/jira/browse/KAFKA-15091
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton


The Javadocs for {{SourceTask::commit}} state that the method should:
{quote}Commit the offsets, up to the offsets that have been returned by 
[{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
{quote}
However, this is obviously incorrect given how the Connect runtime (when not 
configured with exactly-once support for source connectors) performs polling 
and offset commits on separate threads. There's also some extensive discussion 
on the semantics of that method in KAFKA-5716 where it's made clear that 
altering the behavior of the runtime to align with the documented semantics of 
that method is not a viable option.

We should update the Javadocs for this method to state that it does not have 
anything to do with the offsets returned from {{SourceTask:poll}} and is 
instead just a general, periodically-invoked hook to let the task know that an 
offset commit has taken place (but with no guarantees as to which offsets have 
been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15090) Source tasks are no longer stopped on a separate thread

2023-06-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15090:
-

 Summary: Source tasks are no longer stopped on a separate thread
 Key: KAFKA-15090
 URL: https://issues.apache.org/jira/browse/KAFKA-15090
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 
3.3.0, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 3.1.0, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
3.3.3, 3.6.0, 3.5.1
Reporter: Chris Egerton
Assignee: Chris Egerton


Before [https://github.com/apache/kafka/pull/9669,] in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
dedicated thread for the task (i.e., the one responsible for polling data from 
it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances

2023-06-05 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15059:
-

 Summary: Exactly-once source tasks fail to start during pending 
rebalances
 Key: KAFKA-15059
 URL: https://issues.apache.org/jira/browse/KAFKA-15059
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect, mirrormaker
Affects Versions: 3.3.2, 3.3.1, 3.4.0, 3.3.0, 3.5.0, 3.4.1
Reporter: Chris Egerton
Assignee: Chris Egerton


When asked to perform a round of zombie fencing, the distributed herder will 
[reject the 
request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250]
 if a rebalance is pending, which can happen if (among other things) a config 
for a new connector or a new set of task configs has been recently read from 
the config topic.

Normally this can be alleviated with a simple task restart, which isn't great 
but isn't terrible.

However, when running MirrorMaker 2 in dedicated mode, there is no API to 
restart failed tasks, and it can be more common to see this kind of failure on 
a fresh cluster because three connector configurations are written in rapid 
succession to the config topic.

 

In order to provide a better experience for users of both vanilla Kafka Connect 
and dedicated MirrorMaker 2 clusters, we can retry (likely with the same 
exponential backoff introduced with KAFKA-14732) zombie fencing attempts that 
fail due to a pending rebalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-06-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15012.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
>   at 
> org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> 

[jira] [Resolved] (KAFKA-14863) Plugins which do not have a valid no-args constructor are visible in the REST API

2023-06-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14863.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> Plugins which do not have a valid no-args constructor are visible in the REST 
> API
> -
>
> Key: KAFKA-14863
> URL: https://issues.apache.org/jira/browse/KAFKA-14863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.6.0
>
>
> Currently, the Connect plugin discovery mechanisms only assert that a no-args 
> constructor is present when necessary. In particular, this assertion happens 
> for Connectors when the framework needs to evaluate the connector's version 
> method.
> It also happens for ConnectorConfigOverridePolicy, ConnectRestExtension, and 
> ConfigProvider plugins, which are loaded via the ServiceLoader. The 
> ServiceLoader constructs instances of plugins with their no-args constructor 
> during discovery, so these plugins are discovered even if they are not 
> Versioned.
> This has the effect that these unusable plugins which are missing a default 
> constructor appear in the REST API, but are not able to be instantiated or 
> used. To make the ServiceLoader and Reflections discovery mechanisms behave 
> more similar, this assertion should be applied to all plugins, and a log 
> message emitted when plugins do not follow the constructor requirements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-23 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15018:
-

 Summary: Potential tombstone offsets corruption for exactly-once 
source connectors
 Key: KAFKA-15018
 URL: https://issues.apache.org/jira/browse/KAFKA-15018
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.3.2, 3.3.1, 3.4.0, 3.3.0, 3.5.0, 3.4.1
Reporter: Chris Egerton


When exactly-once support is enabled for source connectors, source offsets can 
potentially be written to two different offsets topics: a topic specific to the 
connector, and the global offsets topic (which was used for all connectors 
prior to KIP-618 / version 3.3.0).

Precedence is given to offsets in the per-connector offsets topic, but if none 
are found for a given partition, then the global offsets topic is used as a 
fallback.

When committing offsets, a transaction is used to ensure that source records 
and source offsets are written to the Kafka cluster targeted by the source 
connector. This transaction only includes the connector-specific offsets topic. 
Writes to the global offsets topic take place after writes to the 
connector-specific offsets topic have completed successfully, and if they fail, 
a warning message is logged, but no other action is taken.

Normally, this ensures that, for offsets committed by exactly-once-supported 
source connectors, the per-connector offsets topic is at least as up-to-date as 
the global offsets topic, and sometimes even ahead.

However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
is successfully written to the per-connector offsets topic, but cannot be 
written to the global offsets topic, then the global offsets topic will still 
contain that source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker, if a task requests offsets for one 
of the tombstoned partitions, the worker will provide it with the offsets 
present in the global offsets topic, instead of indicating to the task that no 
offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14783) Implement new STOPPED state for connectors

2023-04-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14783.
---
Fix Version/s: 3.5.0
   Resolution: Done

> Implement new STOPPED state for connectors
> --
>
> Key: KAFKA-14783
> URL: https://issues.apache.org/jira/browse/KAFKA-14783
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> Implement the {{STOPPED}} state [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role

2023-03-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14838.
---
Resolution: Done

> MM2 Worker/Connector/Task clients should specify client ID based on flow and 
> role
> -
>
> Key: KAFKA-14838
> URL: https://issues.apache.org/jira/browse/KAFKA-14838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
> Fix For: 3.5.0
>
>
> MM2 code creates a lot of Kafka clients internally. These clients generate a 
> lot of logs, but since the client.id is not properly specified, connecting 
> the dots between a specific Connector/Task and its internal client is close 
> to impossible. This is even more complex when MM2 is running in distributed 
> mode, in which multiple Connect workers are running inside the same process.
> For Connector/Task created clients, the client.id  clients should specify the 
> flow, the Connector name/Task ID and the role of the client. E.g. 
> MirrorSourceConnector uses multiple admin clients, and their client.id should 
> reflect the difference between them.
> For Worker created clients, the client.id should refer to the flow.
> This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role

2023-03-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-14838:
---

> MM2 Worker/Connector/Task clients should specify client ID based on flow and 
> role
> -
>
> Key: KAFKA-14838
> URL: https://issues.apache.org/jira/browse/KAFKA-14838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
> Fix For: 3.5.0
>
>
> MM2 code creates a lot of Kafka clients internally. These clients generate a 
> lot of logs, but since the client.id is not properly specified, connecting 
> the dots between a specific Connector/Task and its internal client is close 
> to impossible. This is even more complex when MM2 is running in distributed 
> mode, in which multiple Connect workers are running inside the same process.
> For Connector/Task created clients, the client.id  clients should specify the 
> flow, the Connector name/Task ID and the role of the client. E.g. 
> MirrorSourceConnector uses multiple admin clients, and their client.id should 
> reflect the difference between them.
> For Worker created clients, the client.id should refer to the flow.
> This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14843) Connector plugins config endpoint does not include Common configs

2023-03-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14843.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Connector plugins config endpoint does not include Common configs
> -
>
> Key: KAFKA-14843
> URL: https://issues.apache.org/jira/browse/KAFKA-14843
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0, 3.3.0, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
> Fix For: 3.5.0
>
>
> Connector plugins GET config endpoint introduced in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions]
>   allows to get plugin configuration from the rest endpoint.
> This configuration only includes the plugin configuration, but not the base 
> configuration of the Sink/Source Connector.
> For instance, when validating the configuration of a plugin, _all_ configs 
> are returned:
> ```
> curl -s 
> $CONNECT_URL/connector-plugins/io.aiven.kafka.connect.http.HttpSinkConnector/config
>  | jq -r '.[].name' | sort -u | wc -l     
> 21
> curl -s 
> $CONNECT_URL/connector-plugins/io.aiven.kafka.connect.http.HttpSinkConnector/config/validate
>  -XPUT -H 'Content-type: application/json' --data "\{\"connector.class\": 
> \"io.aiven.kafka.connect.http.HttpSinkConnector\", \"topics\": 
> \"example-topic-name\"}" | jq -r '.configs[].definition.name' | sort -u | wc 
> -l
> 39
> ```
> and the missing configs are all from base config:
> ```
> diff validate.txt config.txt                                                  
>                                                   
> 6,14d5
> < config.action.reload
> < connector.class
> < errors.deadletterqueue.context.headers.enable
> < errors.deadletterqueue.topic.name
> < errors.deadletterqueue.topic.replication.factor
> < errors.log.enable
> < errors.log.include.messages
> < errors.retry.delay.max.ms
> < errors.retry.timeout
> 16d6
> < header.converter
> 24d13
> < key.converter
> 26d14
> < name
> 33d20
> < predicates
> 35,39d21
> < tasks.max
> < topics
> < topics.regex
> < transforms
> < value.converter
> ```
> Would be great to get the base configs from the same endpoint as well, so we 
> could rely on it instead of using the validate endpoint to get all configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14842) MirrorCheckpointTask can reduce the rpc calls of "listConsumerGroupOffsets(group)" of irrelevant groups at each poll

2023-03-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14842.
---
Fix Version/s: 3.5.0
   Resolution: Done

> MirrorCheckpointTask can reduce the rpc calls of 
> "listConsumerGroupOffsets(group)" of irrelevant groups at each poll
> 
>
> Key: KAFKA-14842
> URL: https://issues.apache.org/jira/browse/KAFKA-14842
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.5.0
>
>
> sorry, wrong related.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14858) Standalone herder does not handle exceptions thrown from connector taskConfigs method

2023-03-28 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14858:
-

 Summary: Standalone herder does not handle exceptions thrown from 
connector taskConfigs method
 Key: KAFKA-14858
 URL: https://issues.apache.org/jira/browse/KAFKA-14858
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton


In distributed mode, if a connector throws an exception from its 
{{taskConfigs}} method (invoked by the herder, through the {{Worker}} class, 
[here|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1960]),
 we wait for an exponential backoff period (see KAFKA-14732) and then [retry 
the 
operation|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1907-L1911].

However, in standalone mode, not only do we not retry the operation, we do not 
even log the exception. In addition, when REST calls are made that require 
generating new task configs for a connector (which include creating and 
reconfiguring a connector), if the connector's {{taskConfigs}} method throws an 
exception, those requests will time out since the 
[callback|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L183]
 we use to respond to those requests never gets invoked.

At a bare minimum, we should:
 * Log any exceptions thrown from the {{taskConfigs}} method at {{ERROR}} level
 * Invoke any callbacks passed in to the relevant {{StandaloneHerder}} methods 
with any exceptions thrown by the {{taskConfigs}} method

We might also consider introducing the same kind of exponential backoff retry 
logic used by distributed mode, but this can be addressed separately since it 
would be a much larger change in behavior and may break existing user's 
deployments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14855) Harden integration testing logic for asserting that a connector is deleted

2023-03-27 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14855:
-

 Summary: Harden integration testing logic for asserting that a 
connector is deleted
 Key: KAFKA-14855
 URL: https://issues.apache.org/jira/browse/KAFKA-14855
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton


In the Connect embedded integration testing framework, the 
[EmbeddedConnectClusterAssertions::assertConnectorAndTasksAreStopped 
method|https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L411-L428]
 is used in several places to verify that a connector has been deleted. (This 
method may be renamed in an upcoming PR to something like 
{{{}assertConnectorAndTasksAreNotRunning{}}}, but apart from that, its usage 
and semantics will remain unchanged.) However, the [underlying logic for that 
assertion|https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L430-L451]
 doesn't strictly check for deletion (which can be done by verifying that the 
connector and its tasks no longer appear in the REST API at all), since it also 
allows for the Connector or tasks to appear in the REST API, but with a state 
that is not {{{}RUNNING{}}}.

This constraint is a bit too lax and may be silently masking issues with our 
shutdown logic for to-be-deleted connectors. We should try to narrow the 
criteria for that method so that it fails if the Connector or any of its tasks 
still appear in the REST API, even with a non-{{{}RUNNING{}}} state.

However, we should also be careful to ensure that current uses of that method 
are not relying on its semantics. If, for some reason, a test case requires the 
existing semantics, we should evaluate whether it's necessary to continue to 
rely on those semantics, and if so, probably preserve the existing method so 
that it can be used wherever applicable (but rewrite all other tests to use the 
new, stricter method).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14814) Skip restart of connectors when redundant resume request is made

2023-03-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14814.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Skip restart of connectors when redundant resume request is made
> 
>
> Key: KAFKA-14814
> URL: https://issues.apache.org/jira/browse/KAFKA-14814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chaitanya Mukka
>Priority: Minor
> Fix For: 3.5.0
>
>
> Consecutive requests to the {{PUT /connectors//resume}} endpoint will 
> cause the Connector to be restarted. This is a little wasteful and conflicts 
> with the idempotent nature of that endpoint. We can tweak the 
> {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not 
> invoke the {{onConnectorTargetStateChange}} method of their 
> {{ConfigUpdateListener}} instance if they pick up a new target state that 
> matches the current target state of the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14797) MM2 does not emit offset syncs when conservative translation logic exceeds positive max.offset.lag

2023-03-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14797.
---
Resolution: Fixed

> MM2 does not emit offset syncs when conservative translation logic exceeds 
> positive max.offset.lag
> --
>
> Key: KAFKA-14797
> URL: https://issues.apache.org/jira/browse/KAFKA-14797
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> This is a regression in MirrorMaker 2 introduced by KAFKA-12468.
> Reproduction steps:
> 1. Set max.offset.lag to a non-zero value.
> 2. Set up a 1-1 replication flow which does not skip upstream offsets or have 
> a concurrent producer to the target topic.
> 3. Produce more than max.offset.lag records to the source topic and allow 
> replication to proceed.
> 4. Examine end offsets, checkpoints and/or target consumer group lag
> Expected behavior:
> Consumer group lag should be at most max.offset.lag.
> Actual behavior:
> Consumer group lag is significantly larger than max.offset.lag.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14816) Connect loading SSL configs when contacting non-HTTPS URLs

2023-03-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14816.
---
  Reviewer: Justine Olshan
Resolution: Fixed

> Connect loading SSL configs when contacting non-HTTPS URLs
> --
>
> Key: KAFKA-14816
> URL: https://issues.apache.org/jira/browse/KAFKA-14816
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0
>Reporter: Ian McDonald
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1
>
>
> Due to changes made here: [https://github.com/apache/kafka/pull/12828]
> Connect now unconditionally loads SSL configs from the worker into rest 
> clients it uses for cross-worker communication and uses them even when 
> issuing requests to HTTP (i.e., non-HTTPS) URLs. Previously, it would only 
> attempt to load (and validate) SSL properties when issuing requests to HTTPS 
> URLs. This can cause issues when a Connect cluster has stopped securing its 
> REST API with SSL but its worker configs still contain the old (and 
> now-invalid) SSL properties. When this happens, REST requests that hit a 
> follower worker but need to be forwarded to the leader will fail, and 
> connectors that perform dynamic reconfigurations via 
> [ConnectorContext::requestTaskReconfiguration|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
>  will fail to trigger that reconfiguration if they are not running on the 
> leader.
> In our testing environments - older versions without the linked changes pass 
> with the following configuration, and newer versions with the changes fail:
> {{ssl.keystore.location = /mnt/security/test.keystore.jks}}
> {{ssl.keystore.password = [hidden]}}
> {{ssl.keystore.type = JKS}}
> {{ssl.protocol = TLSv1.2}}
> It's important to note that the file {{/mnt/security/test.keystore.jks}} 
> isn't generated for our non-SSL tests, however these configs are still 
> included in our worker config file.
> This leads to a 500 response when hitting the create connector REST endpoint 
> with the following error:
> bq. { "error_code":500,   "message":"Failed to start RestClient:   
> /mnt/security/test.keystore.jks is not a valid keystore" }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14814) Skip restart of connectors when redundant resume request is made

2023-03-16 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14814:
-

 Summary: Skip restart of connectors when redundant resume request 
is made
 Key: KAFKA-14814
 URL: https://issues.apache.org/jira/browse/KAFKA-14814
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


Consecutive requests to the {{PUT /connectors//resume}} endpoint will 
cause the Connector to be restarted. This is a little wasteful and conflicts 
with the idempotent nature of that endpoint. We can tweak the 
{{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not 
invoke the {{onConnectorTargetStateChange}} method of their 
{{ConfigUpdateListener}} instance if they pick up a new target state that 
matches the current target state of the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14809) Connect incorrectly logs that no records were produced by source tasks

2023-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14809.
---
Resolution: Fixed

> Connect incorrectly logs that no records were produced by source tasks
> --
>
> Key: KAFKA-14809
> URL: https://issues.apache.org/jira/browse/KAFKA-14809
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3
>
>
> There's an *{{if}}* condition when [committing 
> offsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L219]
>  that is referencing the wrong variable, so the statement always evaluates to 
> {*}true{*}.
> This causes log statements like the following to be spuriously emitted:
> {quote}[2023-03-14 16:18:04,675] DEBUG WorkerSourceTask\{id=job-0} Either no 
> records were produced by the task since the last offset commit, or every 
> record has been filtered out by a transformation or dropped due to 
> transformation or conversion errors. 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:220)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14803) topic deletion bug

2023-03-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14803.
---
Resolution: Duplicate

> topic deletion bug
> --
>
> Key: KAFKA-14803
> URL: https://issues.apache.org/jira/browse/KAFKA-14803
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.3.2
> Environment: AWS m5.xlarge EC2 instance
>Reporter: Behavox
>Priority: Major
> Attachments: server.properties
>
>
> topic deletion doesn't work as expected when attempting to delete topic(s), 
> after successful deletion topic is recreated in a multi-controller 
> environment with 3 controllers and ReplicationFactor: 2
> How to reproduce - attempt to delete topic. Topic is removed successfully and 
> recreated right after removal. Example below shows a single topic named 
> example-topic. We have a total count of 17000 topics in the affected cluster. 
>  
> Our config is attached. 
> Run 1
> [2023-03-10 16:16:45,625] INFO [Controller 1] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,722] INFO [Controller 1] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,730] INFO [Controller 2] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,851] INFO [Controller 2] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:16:45,837] INFO [Controller 3] Removed topic example-topic 
> with ID fh_aQcc3Sf2yVBTMrltBlQ. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:19:04,833] INFO [Controller 3] Created topic example-topic 
> with topic ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> Run 2
> [2023-03-10 16:20:22,469] INFO [Controller 1] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:19,711] INFO [Controller 1] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 2] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,022] INFO [Controller 2] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:20:22,674] INFO [Controller 3] Removed topic example-topic 
> with ID a-7OZG_XQhiCatOBft-9-g. 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2023-03-10 16:22:20,020] INFO [Controller 3] Created topic example-topic 
> with topic ID xxlJlIe_SvqQHtfgbX2eLA. 
> (org.apache.kafka.controller.ReplicationControlManager)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14799) Source tasks fail if connector attempts to abort empty transaction

2023-03-09 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14799:
-

 Summary: Source tasks fail if connector attempts to abort empty 
transaction
 Key: KAFKA-14799
 URL: https://issues.apache.org/jira/browse/KAFKA-14799
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


If a source task invokes 
[TransactionContext::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/TransactionContext.html#abortTransaction()]
 while the current transaction is empty, and then returns an empty batch of 
records from the next (or current) invocation of {{{}SourceTask::poll{}}}, the 
task will fail.

This is because the Connect framework will honor the transaction abort request 
by invoking 
[KafkaProducer::abortTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction()],
 but without having first invoked 
[KafkaProducer::beginTransaction|https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#beginTransaction()]
 (since no records had been received from the task), which leads to an 
{{{}IllegalStateException{}}}.

An example stack trace for this scenario:
{quote}[2023-03-09 10:41:25,053] ERROR [exactlyOnceQuestionMark|task-0] 
ExactlyOnceWorkerSourceTask\{id=exactlyOnceQuestionMark-0} Task threw an 
uncaught and unrecoverable exception. Task is being killed and will not recover 
until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
java.lang.IllegalStateException: TransactionalId 
exactly-once-source-integration-test-exactlyOnceQuestionMark-0: Invalid 
transition attempted from state READY to state ABORTING_TRANSACTION
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:967)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginAbort$3(TransactionManager.java:269)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
    at 
org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.abortTransaction(ExactlyOnceWorkerSourceTask.java:495)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$3.shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:473)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.maybeCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.java:398)
    at 
org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.batchDispatched(ExactlyOnceWorkerSourceTask.java:186)
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:362)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
{quote}
 

As far as a fix goes, we have a few options:
 # Gracefully handle this case by translating the call to 
{{TransactionContext::abortTransaction}} into a no-op
 # Throw an exception (probably an {{{}IllegalStateException{}}}) from 
{{{}TransactionContext::abortTransaction{}}}, which may fail the task, but 
would give it the option to swallow the exception and continue processing if it 
would like
 # Forcibly fail the task without giving it the chance to swallow an exception, 
using a similar strategy to how we fail tasks that request that a transaction 
be committed and aborted for the same record (see 
[here|https://github.com/apache/kafka/blob/c5240c0390892fe9ecbe5285185c370e7be8b2aa/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTransactionContext.java#L78-L86])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14781) MM2 logs misleading error during topic ACL sync when broker does not have authorizer configured

2023-03-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14781.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> MM2 logs misleading error during topic ACL sync when broker does not have 
> authorizer configured
> ---
>
> Key: KAFKA-14781
> URL: https://issues.apache.org/jira/browse/KAFKA-14781
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> When there is no broker-side authorizer configured on a Kafka cluster 
> targeted by MirrorMaker 2, users see error-level log messages like this 
> one:{{{}{}}}
> {quote}[2023-03-06 10:53:57,488] ERROR [MirrorSourceConnector|worker] 
> Scheduler for MirrorSourceConnector caught exception in scheduled task: 
> syncing topic ACLs (org.apache.kafka.connect.mirror.Scheduler:102)
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is 
> configured on the broker
>     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:456)
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:342)
>     at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>     at 
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>     at 
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No 
> Authorizer is configured on the broker
> {quote}
> This can be misleading as it looks like something is wrong with MM2 or the 
> Kafka cluster. In reality, it's usually fine, since topic ACL syncing is 
> enabled by default and it's reasonable for Kafka clusters (especially in 
> testing/dev environments) to not have authorizers enabled.
> We should try to catch this specific case and downgrade the severity of the 
> log message from {{ERROR}} to either {{INFO}} or {{{}DEBUG{}}}. We may also 
> consider suggesting to users that they disable topic ACL syncing if their 
> Kafka cluster doesn't have authorization set up, but this should probably 
> only be emitted once over the lifetime of the connector in order to avoid 
> generating log spam.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14786) Implement connector offset write/reset internal logic

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14786:
-

 Summary: Implement connector offset write/reset internal logic
 Key: KAFKA-14786
 URL: https://issues.apache.org/jira/browse/KAFKA-14786
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Chris Egerton


Implement the internal logic necessary for altering/resetting the offsets of 
connectors, [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Endpointsbehavior].

This should not include any changes to public interface except the introduction 
of the new {{SourceConnector::alterOffsets}} and 
{{SinkConnector::alterOffsets}} methods (i.e., it should not expose or test any 
new REST endpoints).

Ideally, we'll separate this from KAFKA-14368, KAFKA-14784, and KAFKA-14785 by 
making all changes here target the internal Connect {{Herder}} interface, and 
have the changes for the other three rely on those new {{Herder}} methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14784) Implement connector offset reset REST API

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14784:
-

 Summary: Implement connector offset reset REST API
 Key: KAFKA-14784
 URL: https://issues.apache.org/jira/browse/KAFKA-14784
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Chris Egerton


Implement the {{DELETE /connectors/name/offsets}} endpoint [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Resettingoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14785) Implement connector offset read REST API

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14785:
-

 Summary: Implement connector offset read REST API
 Key: KAFKA-14785
 URL: https://issues.apache.org/jira/browse/KAFKA-14785
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Chris Egerton


Implement the {{GET /connector/name/offsets}} endpoint [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Readingoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14783) Implement new STOPPED state for connectors

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14783:
-

 Summary: Implement new STOPPED state for connectors
 Key: KAFKA-14783
 URL: https://issues.apache.org/jira/browse/KAFKA-14783
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


Implement the {{STOPPED}} state [described in 
KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14781) MM2 logs misleading error during topic ACL sync when broker does not have authorizer configured

2023-03-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14781:
-

 Summary: MM2 logs misleading error during topic ACL sync when 
broker does not have authorizer configured
 Key: KAFKA-14781
 URL: https://issues.apache.org/jira/browse/KAFKA-14781
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Reporter: Chris Egerton


When there is no broker-side authorizer configured on a Kafka cluster targeted 
by MirrorMaker 2, users see error-level log messages like this one:{{{}{}}}
{quote}[2023-03-06 10:53:57,488] ERROR [MirrorSourceConnector|worker] Scheduler 
for MirrorSourceConnector caught exception in scheduled task: syncing topic 
ACLs (org.apache.kafka.connect.mirror.Scheduler:102)
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is 
configured on the broker
    at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:456)
    at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:342)
    at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
    at 
org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
    at 
org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No 
Authorizer is configured on the broker
{quote}
This can be misleading as it looks like something is wrong with MM2 or the 
Kafka cluster. In reality, it's usually fine, since topic ACL syncing is 
enabled by default and it's reasonable for Kafka clusters (especially in 
testing/dev environments) to not have authorizers enabled.

We should try to catch this specific case and downgrade the severity of the log 
message from {{ERROR}} to either {{INFO}} or {{{}DEBUG{}}}. We may also 
consider suggesting to users that they disable topic ACL syncing if their Kafka 
cluster doesn't have authorization set up, but this should probably only be 
emitted once over the lifetime of the connector in order to avoid generating 
log spam.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14659.
---
Resolution: Fixed

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Assignee: Hector Geraldino
>Priority: Minor
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14060) Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-27 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14060.
---
Resolution: Done

> Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
> ---
>
> Key: KAFKA-14060
> URL: https://issues.apache.org/jira/browse/KAFKA-14060
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Hector Geraldino
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13659) MM2 should read all offset syncs at start up

2023-02-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13659.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> MM2 should read all offset syncs at start up
> 
>
> Key: KAFKA-13659
> URL: https://issues.apache.org/jira/browse/KAFKA-13659
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Kanalas Vidor
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> MirrorCheckpointTask uses OffsetSyncStore, and does not check whether 
> OffsetSyncStore managed to read to the "end" of the offset-syncs topic. 
> OffsetSyncStore should fetch the endoffset of the topic at startup, and set a 
> flag when it finally reaches the endoffset in consumption. 
> MirrorCheckpointTask.poll should wait for this flag to be true before doing 
> any in-memory updates and group offset management.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2023-02-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-12566.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Assignee: Greg Harris
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote}
> and
> {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] 
> Graceful stop of task MirrorHeartbeatConnector-0 failed. 
> (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) 
> org.apache.kafka.common.KafkaException: Producer is closed forcefully. at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) 
> at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out 
> while waiting for producer to flush outstanding 1 messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14727) Connect EOS mode should periodically call task commit

2023-02-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14727.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Connect EOS mode should periodically call task commit
> -
>
> Key: KAFKA-14727
> URL: https://issues.apache.org/jira/browse/KAFKA-14727
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> In non-EOS mode, there is a background thread which periodically commits 
> offsets for a task. If this thread does not have resources to flush on the 
> framework side (records, or offsets) it still calls the task's commit() 
> method to update the internal state of the task.
> In EOS mode, there is no background thread, and all offset commits are 
> performed on the main task thread in response to sending records to Kafka. 
> This has the effect of only triggering the task's commit() method when there 
> are records to send to Kafka, which is different than non-EOS mode.
> In order to bring the two modes into better alignment, and allow tasks 
> reliant on the non-EOS empty commit() behavior to work in EOS mode 
> out-of-the-box, EOS mode should provide offset commits periodically for tasks 
> which do not produce records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14565) Interceptor Resource Leak

2023-02-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14565.
---
Resolution: Fixed

> Interceptor Resource Leak
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List> interceptorList = (List) 
> config.getConfiguredInstances(
> ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
> ConsumerInterceptor.class,
> Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we can
>  * Create a class which inherits or wraps AbstractConfig that contains a new 
> method which will return a ConfiguredInstanceResult class.  This 
> ConfiguredInstanceResult  class will contain an optional list of successfully 
> created interceptors and/or exception which occurred while calling each 
> Interceptor::configure.  Additionally, it will contain a helper method to 
> rethrow an exception as well as a method which returns the underlying 
> exception.  The caller is expected to handle the exception and perform clean 
> up e.g. call  Interceptor::close  on each interceptor in the list provided by 
> the ConfiguredInstanceResult class.
>  * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} 
> instances if/when a failure occurs
>  * Add a new overloaded {{getConfiguredInstance}} / 
> {{getConfiguredInstances}} variant that allows users to specify whether 
> already-instantiated classes should be closed or not when a failure occurs
>  * Add a new exception type to the public API that includes a list of all of 
> the successfully-instantiated (and/or successfully-configured) instances 
> before the error was encountered so that callers can choose how to handle the 
> failure however they want (and possibly so that instantiation/configuration 
> can be attempted on every class before throwing the exception)
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >