[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825454#comment-17825454
 ] 

Greg Harris commented on KAFKA-15841:
-

[~henriquemota] Okay I think i understand better what you're trying to achieve.

> ... one topic per table...
> We have a JDBC Sink for each table.

Okay, you're using scenario (1), one connector per-topic, which should come to 
at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too 
many to fit on a single machine, and certainly needs a cluster to distribute 
the work.

In this scenario, Connect should be able to distribute approximately 9000/M 
connectors and 9000/M tasks to each of the M workers in a distributed cluster, 
barring any other practical limits/timeouts that i'm not aware of, so check for 
ERROR messages.

> We tried to change the 'topics' property in the configurations using the 
> 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property 
> when it is returned by 'taskConfigs(int maxTasks)'.

The reason it does this is because the `topics` property is passed to the 
consumers to have them subscribe to the input topics, and the Consumer/Connect 
processing model has this subscription be the same for all consumers.
This doesn't mean that every consumer is consuming every topic, however. Having 
a uniform subscription across all of the consumers in a group tells the 
consumers to assign the work among themselves, assigning the topic-partitions 
to each of the consumers according to the configured assignor.

As an example, say your connector config had `topics=a,b`, and these two topics 
had 2 partitions, and tasks.max=2.

The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 
partitions could be distributed like this by the consumer partition assignor:

task-0: a-0, b-0

task-1: a-1, b-1

Or any permutation. This is where the partitioner I mentioned is important; The 
RangeAssignor can generate some pretty unbalanced assignments: 
[https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html]

If you choose a different assignor (RoundRobin, Sticky, etc), then you can 
switch to scenario (2), with one connector per client, and some tasks.max 
around 10. This would give you ~90 connectors with 900 tasks, each working on 
10 topics.

Tou can tune tasks.max up and down if you need more throughput or want less 
consumer/task overhead.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
> Attachments: image-2024-02-19-13-48-55-875.png
>
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-19 Thread Henrique Mota (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818535#comment-17818535
 ] 

Henrique Mota commented on KAFKA-15841:
---

That conditional basically prevented us from achieving what we wanted:

!image-2024-02-19-13-48-55-875.png!


The modification below would help us achieve our goal:

 

 
for (Map taskProps : taskConfigs) {
  // Ensure we don't modify the connector's copy of the config
  Map taskConfig = new HashMap<>(taskProps);
  taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
  if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG) {*}{color:#de350b}&& 
!taskConfig.containsKey(SinkTask.TOPICS_CONFIG){color}{*}) {
    taskConfig.put(SinkTask.TOPICS_CONFIG, 
connOriginals.get(SinkTask.TOPICS_CONFIG));
  }
  if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG) 
{*}{color:#de350b}&& !connOriginals.containsKey(SinkTask.taskConfig){color}{*}) 
{
    taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, 
connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
  }
  result.add(taskConfig);
}
 

 

 

 

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
> Attachments: image-2024-02-19-13-48-55-875.png
>
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-19 Thread Henrique Mota (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818530#comment-17818530
 ] 

Henrique Mota commented on KAFKA-15841:
---

Hello [~gharris1727]!

My use case is as follows:

1: I have multiple clients in each environment, with the largest having 90 
clients (databases). 2: Each client has a database in one application, and we 
replicate approximately 100 tables from this database to another application's 
database, with this other database being multi-tenant. 3: Previously, we had 
one topic per table, with some partitions for each topic. So, we needed to 
ensure that if any client had inconsistent data, we would pause the consumption 
for that client and continue processing data for other clients. Thus, we 
separated a topic with a partition for each table and client. We then created 
an extension of the JDBC Sink that can pause a problematic topic, and after 
some time attempt to resume consumption of the paused topic (we decided to use 
one topic per client instead of one partition per client to facilitate 
identification). 4: We have a JDBC Sink for each table. 5: We noticed that if 
we add more than one worker, in this scenario, all topics were assigned to 
worker 0, and the others were left waiting. 6: We tried to change the 'topics' 
property in the configurations using the 'taskConfigs(int maxTasks)' method, 
but Kafka Connect ignores this property when it is returned by 'taskConfigs(int 
maxTasks)'.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817522#comment-17817522
 ] 

Greg Harris commented on KAFKA-15841:
-

Hi [~henriquemota]!

Can you share more details about your setup? Do you have a single JDBC 
connector, or multiple in the same cluster? Are connectors configured to read 
from one topic, or multiple? What is your tasks.max configuration, and what 
consumer assignor are you using?

Consumers support distributing reading from multiple topics/topic-partitions 
across consumers in a consumer group, and Connect supports distributing 
multiple tasks across multiple workers, both of which allow you to map multiple 
topic-partitions of work onto multiple workers.

For example, say you have N topic-paritions, and M connect workers, with N > M. 
Any one of the following could be used to distribute the work:
1. You could run one-connector-per-topic, and configure each connector with 
tasks.max=1, and have the worker distribute the N tasks across M workers.
2. You could add all N topics to a single connector with tasks.max=M, and have 
the consumer group distribute the N topic-partitions among those M tasks.
3. You could manually group the N topics into M groups, and create M connectors 
with tasks.max=1, giving each connector one group of topics.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2023-11-16 Thread Henrique Mota (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786731#comment-17786731
 ] 

Henrique Mota commented on KAFKA-15841:
---

Currently, in our setup, we're unable to assign more than one worker per sink. 
Whenever we attempt to allocate multiple workers, all topics end up on Worker 0 
because all topics have only one partition.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)