[jira] [Created] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully

2024-01-25 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-16196:
--

 Summary: Cast transform doesn't handle invalid whole value casts 
gracefully
 Key: KAFKA-16196
 URL: https://issues.apache.org/jira/browse/KAFKA-16196
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: Yash Mayya
Assignee: Yash Mayya


The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as 
opposed to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts aren't handled appropriately and result in 
an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15888) DistributedHerder log context should not use the same client ID for each Connect worker by default

2023-11-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15888:
--

 Summary: DistributedHerder log context should not use the same 
client ID for each Connect worker by default
 Key: KAFKA-15888
 URL: https://issues.apache.org/jira/browse/KAFKA-15888
 Project: Kafka
  Issue Type: Bug
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


By default, if there is no "{{{}client.id"{}}} configured on a Connect worker 
running in distributed mode, the same client ID ("connect-1") will be used in 
the log context for the DistributedHerder class in every single worker in the 
Connect cluster. This default is quite confusing and obviously not very useful. 
Further, based on how this default is configured 
([ref|https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299]),
 it seems like this might have been an unintentional bug. We could simply use 
the workerId (the advertised host name and port of the worker) by default 
instead, which should be unique for each worker in a cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15570) Add unit tests for MemoryConfigBackingStore

2023-10-10 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15570:
--

 Summary: Add unit tests for MemoryConfigBackingStore
 Key: KAFKA-15570
 URL: https://issues.apache.org/jira/browse/KAFKA-15570
 Project: Kafka
  Issue Type: Test
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Currently, the 
[MemoryConfigBackingStore|https://github.com/apache/kafka/blob/6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java#L37]
 class doesn't have any unit tests for its functionality. While most of its 
functionality is fairly lightweight today, changes will be introduced with 
[KIP-980|https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state]
 (potentially 
[KIP-976|https://cwiki.apache.org/confluence/display/KAFKA/KIP-976%3A+Cluster-wide+dynamic+log+adjustment+for+Kafka+Connect]
 as well) and it would be good to have a test setup in place before those 
changes are made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15547) Thread leak in MirrorMakerConfigTest#testClientConfigProperties

2023-10-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-15547.

Fix Version/s: 3.7.0
   Resolution: Fixed

> Thread leak in MirrorMakerConfigTest#testClientConfigProperties
> ---
>
> Key: KAFKA-15547
> URL: https://issues.apache.org/jira/browse/KAFKA-15547
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
> Fix For: 3.7.0
>
>
> The test MirrorMakerConfigTest#testClientConfigProperties opens a 
> ForwardingAdmin but fails to close it.
> we should enclose this in a try-with-resources statement to ensure the Admin 
> client is closed and there is no thread leak



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API

2023-09-27 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-15177.

Fix Version/s: 3.6.0
   Resolution: Fixed

> MirrorMaker 2 should implement the alterOffsets KIP-875 API
> ---
>
> Key: KAFKA-15177
> URL: https://issues.apache.org/jira/browse/KAFKA-15177
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Reporter: Yash Mayya
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.6.0
>
>
> The {{MirrorSourceConnector}} class should implement the new alterOffsets API 
> added in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect].
>  We could also implement the API in 
> {{MirrorCheckpointConnector}} and 
> {{MirrorHeartbeatConnector}} to prevent external modification of offsets 
> since the operation wouldn't really make sense in their case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15470) Allow creating connectors in a stopped state

2023-09-15 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15470:
--

 Summary: Allow creating connectors in a stopped state
 Key: KAFKA-15470
 URL: https://issues.apache.org/jira/browse/KAFKA-15470
 Project: Kafka
  Issue Type: New Feature
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya
 Fix For: 3.7.0


[KIP-875: First-class offsets support in Kafka 
Connect|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 introduced a new {{STOPPED}} state for connectors along with some REST API 
endpoints to retrieve and modify offsets for connectors. Currently, only 
connectors that already exist can be stopped and any newly created connector 
will always be in the {{RUNNING}} state initially. Allowing the creation of 
connectors in a {{STOPPED}} state will facilitate multiple new use cases. One 
interesting use case would be to migrate connectors from one Kafka Connect 
cluster to another. Individual connector migration would be useful in a number 
of scenarios such as breaking a large cluster into multiple smaller clusters 
(or vice versa), moving a connector from a cluster running in one data center 
to another etc. A connector migration could be achieved by using the following 
sequence of steps :-
 # Stop the running connector on the original Kafka Connect cluster
 # Retrieve the offsets for the connector via the {{GET 
/connectors/\{connector}/offsets}}  endpoint
 # Create the connector in a stopped state using the same configuration on the 
new Kafka Connect cluster
 # Alter the offsets for the connector on the new cluster via the {{PATCH 
/connectors/\{connector}/offsets}}  endpoint (using the offsets obtained from 
the original cluster)
 # Resume the connector on the new cluster and delete it on the original cluster

Another use case for creating connectors in a stopped state could be deploying 
connectors as a part of a larger data pipeline before the source / sink data 
system has been created or is ready for data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id

2023-09-12 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-14067.

Resolution: Fixed

> Sink connector override.consumer.group.id can conflict with worker group.id
> ---
>
> Key: KAFKA-14067
> URL: https://issues.apache.org/jira/browse/KAFKA-14067
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Greg Harris
>Priority: Minor
> Fix For: 3.7.0
>
>
> Currently there is a validation step for connector names which prevents sink 
> connector consumer groups from colliding with the worker group.id.
> There is currently no such validation for consumer.override.group.id that 
> would prevent a conflicting connector from being configured, and so it is 
> possible to misconfigure a connector in a way that may be damaging to the 
> workers themselves.
> Reproduction steps:
> 1. Configure a connect distributed cluster with a certain group.id in the 
> worker config.
> 2. Configure a sink connector with consumer.override.group.id having the same 
> value as in the worker config
> Expected behavior:
> 1. An error is returned indicating that the consumer.override.group.id is 
> invalid
> 2. The connector is not created or started
> Actual behavior:
> 1. No error is returned, and the configuration is otherwise valid.
> 2. The connector is created and starts running.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15387) Deprecate and remove Connect's duplicate task configurations retrieval endpoint

2023-08-21 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15387:
--

 Summary: Deprecate and remove Connect's duplicate task 
configurations retrieval endpoint
 Key: KAFKA-15387
 URL: https://issues.apache.org/jira/browse/KAFKA-15387
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya
 Fix For: 4.0.0


A new endpoint ({{{}GET /connectors/\{connector}/tasks-config){}}} was added to 
Kafka Connect's REST API to expose task configurations in 
[KIP-661|https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API].
 However, the original patch for Kafka Connect's REST API had already added an 
endpoint ({{{}GET /connectors/\{connector}/tasks){}}} to retrieve the list of a 
connector's tasks and their configurations (ref - 
[https://github.com/apache/kafka/pull/378] , 
https://issues.apache.org/jira/browse/KAFKA-2369) and this was missed in 
KIP-661. We can deprecate the endpoint added by KIP-661 in 3.7 (the next minor 
AK release) and remove it in 4.0 (the next major AK release) since it's 
redundant to have two separate endpoints to expose task configurations. Related 
discussions in 
[https://github.com/apache/kafka/pull/13424#discussion_r1144727886] and 
https://issues.apache.org/jira/browse/KAFKA-15377 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-18 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15377:
--

 Summary: GET /connectors/{connector}/tasks-config endpoint exposes 
externalized secret values
 Key: KAFKA-15377
 URL: https://issues.apache.org/jira/browse/KAFKA-15377
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


The \{{GET /connectors/{connector}/tasks-config}} endpoint added in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
 exposes externalized secret values in task configurations (see 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
 A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / 
[https://github.com/apache/kafka/pull/6129] for the \{{GET 
/connectors/{connector}/tasks}} endpoint. The config provider placeholder 
should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15238) Connect workers can be disabled by DLQ related stuck admin client calls

2023-07-24 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15238:
--

 Summary: Connect workers can be disabled by DLQ related stuck 
admin client calls
 Key: KAFKA-15238
 URL: https://issues.apache.org/jira/browse/KAFKA-15238
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


When Kafka Connect is run in distributed mode - if a sink connector's task is 
restarted (via a worker's REST API), the following sequence of steps will occur 
(on the DistributedHerder's thread):

 
 # The existing sink task will be stopped 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
 # A new sink task will be started 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
 # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
 # The DLQ reporter (see 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
 for the sink task is also instantiated and configured as a part of this 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
 # The DLQ reporter setup involves two synchronous admin client calls to list 
topics and create the DLQ topic if it isn't already created 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])

 

All of these are occurring synchronously on the herder's tick thread - in this 
portion 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
 where external requests are run. If the admin client call in the DLQ reporter 
setup step blocks for some time (due to auth failures and retries or network 
issues or whatever other reason), this can cause the Connect worker to become 
non-functional (REST API requests will timeout) and even fall out of the 
Connect cluster and become a zombie (since the tick thread also drives group 
membership functions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument

2023-07-19 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15216:
--

 Summary: InternalSinkRecord::newRecord method ignores the headers 
argument
 Key: KAFKA-15216
 URL: https://issues.apache.org/jira/browse/KAFKA-15216
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56]
 - the headers argument passed to the {{InternalSinkRecord}} constructor is the 
instance field via the accessor {{headers()}} method instead of the 
{{newRecord}} method's {{headers}} argument value.

 

Originally discovered 
[here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets

2023-07-12 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15182:
--

 Summary: Normalize offsets before invoking 
SourceConnector::alterOffsets
 Key: KAFKA-15182
 URL: https://issues.apache.org/jira/browse/KAFKA-15182
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors

2023-07-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15179:
--

 Summary: Add integration tests for the FileStream Sink and Source 
connectors
 Key: KAFKA-15179
 URL: https://issues.apache.org/jira/browse/KAFKA-15179
 Project: Kafka
  Issue Type: Improvement
Reporter: Yash Mayya
Assignee: Yash Mayya


Add integration tests for the FileStream Sink and Source connectors covering 
various different common scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API

2023-07-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15177:
--

 Summary: MirrorMaker 2 should implement the alterOffsets KIP-875 
API
 Key: KAFKA-15177
 URL: https://issues.apache.org/jira/browse/KAFKA-15177
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, mirrormaker
Reporter: Yash Mayya


The {{MirrorSourceConnector}} class should implement the new alterOffsets API 
added in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect].
 We could also implement the API in 
{{MirrorCheckpointConnector}} and 
{{MirrorHeartbeatConnector}} to prevent external modification of offsets since 
the operation wouldn't really make sense in their case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15145:
--

 Summary: AbstractWorkerSourceTask re-processes records filtered 
out by SMTs on retriable exceptions
 Key: KAFKA-15145
 URL: https://issues.apache.org/jira/browse/KAFKA-15145
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


If a RetriableException is thrown from an admin client or producer client 
operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records for a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15121) FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs

2023-06-26 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15121:
--

 Summary: FileStreamSourceConnector and FileStreamSinkConnector 
should implement KIP-875 APIs
 Key: KAFKA-15121
 URL: https://issues.apache.org/jira/browse/KAFKA-15121
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 introduced the new SourceConnector::alterOffsets and 
SinkConnector::alterOffsets APIs. The FileStreamSourceConnector and 
FileStreamSinkConnector should implement these new methods to improve the user 
experience when modifying offsets for these connectors and also to serve as an 
example for other connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15113:
--

 Summary: Gracefully handle cases where a sink connector's admin 
and consumer client config overrides target different Kafka clusters
 Key: KAFKA-15113
 URL: https://issues.apache.org/jira/browse/KAFKA-15113
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya


Background reading -
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 

 

>From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
{quote}Currently, admin clients are only instantiated for sink connectors to 
create the DLQ topic if required. So it seems like it could be technically 
possible for a sink connector's consumer client overrides to target a different 
Kafka cluster from its producer and admin client overrides. Such a setup won't 
work with this implementation of the get offsets API as it is using an admin 
client to get a sink connector's consumer group offsets. However, I'm not sure 
we want to use a consumer client to retrieve the offsets either as we shouldn't 
be disrupting the existing sink tasks' consumer group just to fetch offsets. 
Leveraging a sink task's consumer also isn't an option because fetching offsets 
for a stopped sink connector (where all the tasks will be stopped) should be 
allowed. I'm wondering if we should document that a connector's various client 
config override policies shouldn't target different Kafka clusters (side note - 
looks like we don't [currently 
document|https://kafka.apache.org/documentation/#connect] client config 
overrides for Connect beyond just the worker property 
{{{}connector.client.config.override.policy{}}}).
{quote}
 
{quote}I don't think we need to worry too much about this. I cannot imagine a 
sane use case that involves overriding a connector's Kafka clients with 
different Kafka clusters (not just bootstrap servers, but actually different 
clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
docs that that kind of setup isn't supported but I really, really hope that 
it's not necessary and nobody's trying to do that in the first place. I also 
suspect that there are other places where this might cause issues, like with 
exactly-once source support or automatic topic creation for source connectors.

That said, there is a different case we may want to consider: someone may have 
configured consumer overrides for a sink connector, but not admin overrides. 
This may happen if they don't use a DLQ topic. I don't know if we absolutely 
need to handle this now and we may consider filing a follow-up ticket to look 
into this, but one quick-and-dirty thought I've had is to configure the admin 
client used here with a combination of the configurations for the connector's 
admin client and its consumer, giving precedent to the latter.
{quote}
 

Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] -
{quote}We will have undesirable behavior if the connector is targeting a Kafka 
cluster different from the Connect cluster's backing Kafka cluster and the user 
has configured the consumer overrides appropriately for their connector, but 
not the admin overrides (something we also discussed previously 
[here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).

In the above case, if a user attempts to reset their sink connector's offsets 
via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
will occur:
 # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
which returns an empty partition offsets map for the sink connector's consumer 
group ID (it exists on a different Kafka cluster to the one that the admin 
client is connecting to).
 # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
could cause the sink connector to propagate the offsets reset related changes 
to the sink system.
 # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} 
which returns {{GroupIdNotFoundException}} which we essentially swallow in 
order to keep offsets reset operations idempotent and return a success message 
to the user (even though the real consumer group for the sink connector on the 
other Kafka cluster hasn't been deleted).

This will occur if the connector's admin overrides are missing OR the admin 
overrides are deliberately configured to target a Kafka cluster different from 
the consumer overrides (although like you pointed out in the other linked 
thread, this doesn't seem like a valid use case that we'd even want to support).

I guess we'd want to pursue the approach you suggested where we'd configure the 
admin client with a combination of the connector's admin overrides and 

[jira] [Resolved] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-29 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-14956.

Resolution: Fixed

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>  Labels: flaky-test
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> 

[jira] [Created] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14974:
--

 Summary: Restore backward compatibility in KafkaBasedLog
 Key: KAFKA-14974
 URL: https://issues.apache.org/jira/browse/KAFKA-14974
 Project: Kafka
  Issue Type: Task
Reporter: Yash Mayya
Assignee: Yash Mayya


{{KafkaBasedLog}} is a widely used utility class that provides a generic 
implementation of a shared, compacted log of records in a Kafka topic. It isn't 
in Connect's public API, but has been used outside of Connect and we try to 
preserve backward compatibility whenever possible. 
https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
change is source compatible, it isn't binary compatible. We can restore 
backward compatibility simply by re-instating the older send methods, and 
renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14933) Document Kafka Connect's log level REST APIs added in KIP-495

2023-04-25 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14933:
--

 Summary: Document Kafka Connect's log level REST APIs added in 
KIP-495
 Key: KAFKA-14933
 URL: https://issues.apache.org/jira/browse/KAFKA-14933
 Project: Kafka
  Issue Type: Task
  Components: documentation, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[KIP-495|https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect]
 added 3 REST APIs to allow dynamically adjusting log levels on Kafka Connect 
workers. This was added a long time ago (released in AK 2.4.0) but was never 
publicly documented. These REST APIs should be documented in 
[https://kafka.apache.org/documentation/#connect_rest]. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14910) Consider cancelling ongoing alter connector offsets requests when the connector is resumed

2023-04-14 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14910:
--

 Summary: Consider cancelling ongoing alter connector offsets 
requests when the connector is resumed
 Key: KAFKA-14910
 URL: https://issues.apache.org/jira/browse/KAFKA-14910
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya


See discussion here for more details - 
[https://github.com/apache/kafka/pull/13465#discussion_r1164465874]

The implementation for the _*PATCH /connectors/\{connector}/offsets*_ and 
_*DELETE /connectors/\{connector}/offsets*_ APIs is completely asynchronous and 
the check for whether the connector is stopped will only be made at the 
beginning of the request. 

If the connector is resumed while the alter / reset offsets request is being 
processed, this can lead to certain issues (especially with non-EoS source 
connectors). For sink connectors, admin client requests to alter / reset 
offsets for a consumer group will be rejected if the consumer group is active 
(i.e. when the connector tasks come up). For source connectors when exactly 
once support is enabled on the worker, we do a round of zombie fencing before 
the tasks are brought up and this will basically disable the transactional 
producer used to alter offsets (the transactional producer uses the 
transactional ID for task 0 of the connector). However, for source connectors 
when exactly once support is not enabled on the worker (this is the default), 
there are no such safeguards. We could potentially add some interruption logic 
that cancels ongoing alter / reset offset requests when a connector is resumed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-02 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14876:
--

 Summary: Public documentation for new Kafka Connect offset 
management REST APIs
 Key: KAFKA-14876
 URL: https://issues.apache.org/jira/browse/KAFKA-14876
 Project: Kafka
  Issue Type: Sub-task
Reporter: Yash Mayya
Assignee: Yash Mayya


Add public documentation for the 3 new Kafka Connect offset management REST 
APIs being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 * *GET* /connectors/\{connector}/offsets
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14844) Kafka Connect's OffsetBackingStore interface should handle (de)serialization and connector namespacing

2023-03-24 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14844:
--

 Summary: Kafka Connect's OffsetBackingStore interface should 
handle (de)serialization and connector namespacing
 Key: KAFKA-14844
 URL: https://issues.apache.org/jira/browse/KAFKA-14844
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Relevant discussion here - 
[https://github.com/apache/kafka/pull/13434/files#r114972]

 

TLDR - we should move serialization / deserialization and key construction 
(connector namespacing) for source connector offsets from the 
OffsetStorageWriter / OffsetStorageReader interfaces into the 
OffsetBackingStore interface. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14732) Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-18 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14732:
--

 Summary: Use an exponential backoff retry mechanism while 
reconfiguring connector tasks
 Key: KAFKA-14732
 URL: https://issues.apache.org/jira/browse/KAFKA-14732
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Kafka Connect in distributed mode retries infinitely with a fixed retry backoff 
(250 ms) in case of errors arising during connector task reconfiguration. Tasks 
can be "reconfigured" during connector startup (to get the initial task configs 
from the connector), a connector resume or if a connector explicitly requests 
it via its context. Task reconfiguration essentially entails requesting a 
connector instance for its task configs and writing them to the Connect 
cluster's config storage (in case a change in task configs is detected). A 
fixed retry backoff of 250 ms leads to very aggressive retries - consider a 
Debezium connector which attempts to initiate a database connection in its 
[taskConfigs 
method|https://github.com/debezium/debezium/blob/bf347da71ad9b0819998a3bc9754b3cc96cc1563/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L63].
 If the connection fails due to something like an invalid login, the Connect 
worker will essentially spam connection attempts frequently and indefinitely 
(until the connector config / database side configs are fixed). An exponential 
backoff retry mechanism seems more well suited for the 
[DistributedHerder::reconfigureConnectorTasksWithRetry|https://github.com/apache/kafka/blob/a54a34a11c1c867ff62a7234334cad5139547fd7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1873-L1898]
 method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14569) Migrate EmbeddedKafkaCluster used by Connect integration tests from EmbeddedZookeeper to KRaft

2023-01-04 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14569:
--

 Summary: Migrate EmbeddedKafkaCluster used by Connect integration 
tests from EmbeddedZookeeper to KRaft
 Key: KAFKA-14569
 URL: https://issues.apache.org/jira/browse/KAFKA-14569
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


ZooKeeper mode is going to be deprecated in Apache Kafka 4.0. Connect currently 
uses an 
[EmbeddedKafkaCluster|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L95]
 for integration tests that depends on an 
[EmbeddedZookeeper.|https://github.com/apache/kafka/blob/b8ab09820cd96290176afd24cf7b03e7cda7f783/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L147]
 This should be migrated to remove the ZooKeeper dependency, instead working in 
the KRaft mode (probably with co-located brokers and controllers). We could 
potentially leverage the existing test kit for KRaft clusters 
([https://github.com/apache/kafka/tree/b8ab09820cd96290176afd24cf7b03e7cda7f783/core/src/test/java/kafka/testkit)]
 which handles a bunch of stuff including the listeners configuration setup, 
formatting the metadata log directory, allowing usage of non-static random 
ports for `controller.quorum.voters`, initialization of the shared server(s), 
broker(s), and controller(s) etc.

 

One more thing to note is that some Connect integration tests currently use the 
`kafka.security.authorizer.AclAuthorizer` which requires ZooKeeper. These tests 
should be migrated to use the new authorizer from 
[KIP-801|https://cwiki.apache.org/confluence/display/KAFKA/KIP-801%3A+Implement+an+Authorizer+that+stores+metadata+in+__cluster_metadata]
 if we want to completely eliminate the dependency on ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14455:
--

 Summary: Kafka Connect create and update REST APIs should surface 
failures while writing to the config topic
 Key: KAFKA-14455
 URL: https://issues.apache.org/jira/browse/KAFKA-14455
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` 
REST APIs internally simply write a message to the Connect cluster's internal 
config topic (which is then processed asynchronously by the herder). However, 
no callback is passed to the producer's send method and there is no error 
handling in place for producer send failures (see 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
 / 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]).

Consider one such case where the Connect worker's principal doesn't have a 
WRITE ACL on the cluster's config topic. Now suppose the user submits a 
connector's configs via one of the above two APIs. The producer send 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
 / 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]
 won't succeed (due to a TopicAuthorizationException) but the API responses 
will be `201 Created` success responses anyway. This is a very poor UX because 
the connector will actually never be created but the API response indicated 
success. Furthermore, this failure would only be detectable if TRACE logs are 
enabled (via [this 
log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java]
 making it near impossible for users to debug. Producer callbacks should be 
used to surface write failures back to the user via the API response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14368) Add an offset write REST API to Kafka Connect

2022-11-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14368:
--

 Summary: Add an offset write REST API to Kafka Connect
 Key: KAFKA-14368
 URL: https://issues.apache.org/jira/browse/KAFKA-14368
 Project: Kafka
  Issue Type: New Feature
Reporter: Yash Mayya
Assignee: Yash Mayya


[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 for https://issues.apache.org/jira/browse/KAFKA-4107 proposes to add an offset 
reset API which will allow resetting the offsets for source and sink connectors 
so that they can consume from the beginning of the stream. However, an offset 
API to write arbitrary offsets would also be useful for certain connectors in 
order to go back in time but not to the beginning, or to skip some problematic 
record and move forward. Based on the discussion thread for KIP-875 
[here|https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02], it was 
determined that this could be done through a follow-up KIP if/when KIP-875 is 
adopted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14353) Make Kafka Connect REST request timeouts configurable

2022-11-02 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14353:
--

 Summary: Make Kafka Connect REST request timeouts configurable
 Key: KAFKA-14353
 URL: https://issues.apache.org/jira/browse/KAFKA-14353
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Kafka Connect currently defines a default REST API request timeout of [90 
seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]
 which isn't configurable. If a REST API request takes longer than this, a 
{{500 Internal Server Error}}  response is returned with the message "Request 
timed out". In exceptional scenarios, a longer timeout may be required for 
operations such as connector config validation / connector creation (which 
internally does a config validation first). We should allow the request timeout 
to be configurable via a Kafka Connect worker property.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14342) KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

2022-10-31 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14342:
--

 Summary: KafkaOffsetBackingStore should clear offsets for source 
partitions on tombstone messages
 Key: KAFKA-14342
 URL: https://issues.apache.org/jira/browse/KAFKA-14342
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[KafkaOffsetBackingStore|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L70]
 is used to track source connector offsets using a backing Kafka topic. It 
implements interface methods to get and set offsets using a 
[KafkaBasedLog|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L80].
 It also maintains an in-memory map containing \{partition, offset} entries for 
source connectors (which is populated via the consumer callback mechanism from 
the KafkaBasedLog). When a tombstone offset (i.e. Kafka message with a null 
value) is encountered for a source partition, the map is simply updated to make 
the value null for the corresponding partition key. For certain source 
connectors which have a lot of source partitions that are "closed" frequently, 
this can be very problematic. Imagine a file source connector which reads data 
from all files in a directory line-by-line (and where file appends are not 
tracked) - each file corresponds to a source partition here, and the offset 
would be the line number in the file. If there are millions of files being 
read, this can bring down the Connect worker due to JVM heap exhaustion (OOM) 
caused by the in-memory map in KafkaOffsetBackingStore growing too large. Even 
if the connector writes tombstone offsets for the last record in a source 
partition, this doesn't help completely since we don't currently remove entries 
from KafkaOffsetBackingStore's in-memory offset map (so the source partition 
keys will stick around) - even though we indicate 
[here|https://github.com/apache/kafka/blob/56d588d55ac313c0efca586a3bcd984c99a89018/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java#L37]
 that tombstones can be used to "delete" offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14193) Connect system test ConnectRestApiTest is failing

2022-08-31 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14193:
--

 Summary: Connect system test ConnectRestApiTest is failing
 Key: KAFKA-14193
 URL: https://issues.apache.org/jira/browse/KAFKA-14193
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[ConnectRestApiTest|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/connect/connect_rest_test.py]
 is currently failing on `trunk` and `3.3` with the following assertion error:

 

 
{code:java}
AssertionError()
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
183, in _do_run
    data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
243, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_rest_test.py", 
line 106, in test_rest_api
    self.verify_config(self.FILE_SOURCE_CONNECTOR, self.FILE_SOURCE_CONFIGS, 
configs)
  File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_rest_test.py", 
line 219, in verify_config
    assert config_def == set(config_names){code}
On closer inspection, this is because of the new source connector EOS related 
configs added in [https://github.com/apache/kafka/pull/11775.] Adding the 
following new configs - 
{code:java}
offsets.storage.topic, transaction.boundary, exactly.once.support, 
transaction.boundary.interval.ms{code}
in the expected config defs 
[here|https://github.com/apache/kafka/blob/6f4778301b1fcac1e2750cc697043d674eaa230d/tests/kafkatest/tests/connect/connect_rest_test.py#L35]
 fixes the tests on the 3.3 branch. However, the tests still fail on trunk due 
to the changes from [https://github.com/apache/kafka/pull/12450.]

 

The plan to fix this is to raise two PRs against trunk patching 
connect_rest_test.py - the first one fixing the EOS configs related issue which 
can be backported to 3.3 and the second one fixing the issue related to 
propagation of full connector configs to tasks which shouldn't be backported to 
3.3 (because the commit from https://github.com/apache/kafka/pull/12450 is only 
on trunk and not on 3.3)

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14162) HoistField SMT should not return an immutable map for schemaless key/value

2022-08-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14162:
--

 Summary: HoistField SMT should not return an immutable map for 
schemaless key/value
 Key: KAFKA-14162
 URL: https://issues.apache.org/jira/browse/KAFKA-14162
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya


The HoistField SMT currently returns an immutable map for schemaless keys and 
values - 
https://github.com/apache/kafka/blob/22007fba7c7346c5416f4db4e104434fdab265ee/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java#L62.

 

This can cause issues in connectors if they attempt to modify the 
SourceRecord's key or value since Kafka Connect doesn't document that these 
keys/values are immutable. Furthermore, no other SMT does this.

 

An example of a connector that would fail when schemaless values are used with 
this SMT is Microsoft's Cosmos DB Sink Connector - 
https://github.com/microsoft/kafka-connect-cosmosdb/blob/368566367a1dcbf9a91213067f1b9219a530bb16/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java#L123-L130



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14147) Some map objects in KafkaConfigBackingStore grow in size monotonically

2022-08-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14147:
--

 Summary: Some map objects in KafkaConfigBackingStore grow in size 
monotonically
 Key: KAFKA-14147
 URL: https://issues.apache.org/jira/browse/KAFKA-14147
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya


Similar to https://issues.apache.org/jira/browse/KAFKA-8869

 

{{deferredTaskUpdates, connectorTaskCountRecords and 
connectorTaskConfigGenerations in KafkaConfigBackingStore }}are never updated 
when a connector is deleted, thus growing monotonically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14134) Replace EasyMock with Mockito for WorkerConnectorTest

2022-08-02 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14134:
--

 Summary: Replace EasyMock with Mockito for WorkerConnectorTest
 Key: KAFKA-14134
 URL: https://issues.apache.org/jira/browse/KAFKA-14134
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Yash Mayya






--
This message was sent by Atlassian Jira
(v8.20.10#820010)