[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic

2024-10-29 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893838#comment-17893838
 ] 

Daniel Urban commented on KAFKA-17719:
--

I'm still thinking about a possible solution, but have a few points to consider:
 # In the current state, there is no way to differentiate between the "task 
resurrection" scenario described in KAFKA-16838 and the connect config 
compaction described in this ticket. When we find task configs + task commit 
records without a prior Connector configuration, we cannot know which scenario 
is present. So for existing clusters, we need to make a choice - do we want to 
avoid the resurrection, or do we want to avoid the NPE?
 # For future versions, I think some kind of version/ID should be generated and 
added to the Connector, the corresponding tasks and task commits. This could be 
generated when the Connector is created, and then kept on each update. When a 
Connector is deleted and then re-created, a new ID would be generated, helping 
in catching the zombie tasks.

> Connect may fail to start tasks when reading from a compacted config topic
> --
>
> Key: KAFKA-17719
> URL: https://issues.apache.org/jira/browse/KAFKA-17719
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chaitanya Mukka
>Assignee: Daniel Urban
>Priority: Major
>
> The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
> alters the logic for materializing a view of the config topic to ignore task 
> configs when there is no configuration for that connector present earlier in 
> the config topic. However, the logic fails to consider topics that might get 
> compacted over time.
> In particular, when we have a connector {{C1}} running fine, the records in 
> the config topic for the connector will look something like {{{}C1, T1, T2, 
> Task-commit-record{}}}.
> If the connector gets a config update that doesn't produce any new task 
> configs (note that this is a valid case when there are no task config 
> changes[1]) we only produce a Connector config record [2]. The config topic 
> now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the 
> topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, 
> C1{}}}. This can be a common scenario in large and old connect clusters.
> Based on the changes for KAFKA-16838, when the connect worker reads this 
> config state it ignores the task configs [3] for this while the connector is 
> still active and we might have active assignments for the same. The symptom 
> of this issue is an NPE which shows up when trying to start the tasks: 
> {noformat}
> java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
> "inputMap" is null
>   at org.apache.kafka.common.utils.Utils.castToStringObjectMap
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.connect.runtime.TaskConfig.
>   at org.apache.kafka.connect.runtime.Worker.startTask)
>   at org.apache.kafka.connect.runtime.Worker.startSourceTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
>   at java.base/java.util.concurrent.FutureTask.run
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
>   at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
>  
> [1] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
>   
> [2] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
> [3] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic

2024-10-29 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893768#comment-17893768
 ] 

Daniel Urban commented on KAFKA-17719:
--

[~cmukka20] I was thinking that allowing "deferred" task commits should help - 
if we encounter a task commit without a related connector, we just wait for the 
connector config to appear.

To avoid introducing a regression (KAFKA-16838), connector creation 
(putConnectorConfig) could be updated to explicitly clear zombie tasks with a 
delete marker.

> Connect may fail to start tasks when reading from a compacted config topic
> --
>
> Key: KAFKA-17719
> URL: https://issues.apache.org/jira/browse/KAFKA-17719
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chaitanya Mukka
>Assignee: Daniel Urban
>Priority: Major
>
> The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
> alters the logic for materializing a view of the config topic to ignore task 
> configs when there is no configuration for that connector present earlier in 
> the config topic. However, the logic fails to consider topics that might get 
> compacted over time.
> In particular, when we have a connector {{C1}} running fine, the records in 
> the config topic for the connector will look something like {{{}C1, T1, T2, 
> Task-commit-record{}}}.
> If the connector gets a config update that doesn't produce any new task 
> configs (note that this is a valid case when there are no task config 
> changes[1]) we only produce a Connector config record [2]. The config topic 
> now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the 
> topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, 
> C1{}}}. This can be a common scenario in large and old connect clusters.
> Based on the changes for KAFKA-16838, when the connect worker reads this 
> config state it ignores the task configs [3] for this while the connector is 
> still active and we might have active assignments for the same. The symptom 
> of this issue is an NPE which shows up when trying to start the tasks: 
> {noformat}
> java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
> "inputMap" is null
>   at org.apache.kafka.common.utils.Utils.castToStringObjectMap
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.connect.runtime.TaskConfig.
>   at org.apache.kafka.connect.runtime.Worker.startTask)
>   at org.apache.kafka.connect.runtime.Worker.startSourceTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
>   at java.base/java.util.concurrent.FutureTask.run
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
>   at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
>  
> [1] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
>   
> [2] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
> [3] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic

2024-10-29 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893765#comment-17893765
 ] 

Daniel Urban commented on KAFKA-17719:
--

I see that the fix for KAFKA-16838 was explicitly trying to solve the issue of 
tasks getting "resurrected" when a connector is created with the same name. 
Because of this, "removing" that edge case is not a proper solution, will dig 
deeper and try to figure out a solution.

> Connect may fail to start tasks when reading from a compacted config topic
> --
>
> Key: KAFKA-17719
> URL: https://issues.apache.org/jira/browse/KAFKA-17719
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chaitanya Mukka
>Assignee: Daniel Urban
>Priority: Major
>
> The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
> alters the logic for materializing a view of the config topic to ignore task 
> configs when there is no configuration for that connector present earlier in 
> the config topic. However, the logic fails to consider topics that might get 
> compacted over time.
> In particular, when we have a connector {{C1}} running fine, the records in 
> the config topic for the connector will look something like {{{}C1, T1, T2, 
> Task-commit-record{}}}.
> If the connector gets a config update that doesn't produce any new task 
> configs (note that this is a valid case when there are no task config 
> changes[1]) we only produce a Connector config record [2]. The config topic 
> now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the 
> topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, 
> C1{}}}. This can be a common scenario in large and old connect clusters.
> Based on the changes for KAFKA-16838, when the connect worker reads this 
> config state it ignores the task configs [3] for this while the connector is 
> still active and we might have active assignments for the same. The symptom 
> of this issue is an NPE which shows up when trying to start the tasks: 
> {noformat}
> java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
> "inputMap" is null
>   at org.apache.kafka.common.utils.Utils.castToStringObjectMap
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.connect.runtime.TaskConfig.
>   at org.apache.kafka.connect.runtime.Worker.startTask)
>   at org.apache.kafka.connect.runtime.Worker.startSourceTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
>   at java.base/java.util.concurrent.FutureTask.run
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
>   at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
>  
> [1] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
>   
> [2] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
> [3] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic

2024-10-29 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893727#comment-17893727
 ] 

Daniel Urban commented on KAFKA-17719:
--

[~cmukka20] I think your analysis is correct, will try to submit a fix. I don't 
see a clean way of removing these dangling task configs from memory, so I'll 
just remove the edge case from the logic.

> Connect may fail to start tasks when reading from a compacted config topic
> --
>
> Key: KAFKA-17719
> URL: https://issues.apache.org/jira/browse/KAFKA-17719
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chaitanya Mukka
>Priority: Major
>
> The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
> alters the logic for materializing a view of the config topic to ignore task 
> configs when there is no configuration for that connector present earlier in 
> the config topic. However, the logic fails to consider topics that might get 
> compacted over time.
> In particular, when we have a connector {{C1}} running fine, the records in 
> the config topic for the connector will look something like {{{}C1, T1, T2, 
> Task-commit-record{}}}.
> If the connector gets a config update that doesn't produce any new task 
> configs (note that this is a valid case when there are no task config 
> changes[1]) we only produce a Connector config record [2]. The config topic 
> now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the 
> topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, 
> C1{}}}. This can be a common scenario in large and old connect clusters.
> Based on the changes for KAFKA-16838, when the connect worker reads this 
> config state it ignores the task configs [3] for this while the connector is 
> still active and we might have active assignments for the same. The symptom 
> of this issue is an NPE which shows up when trying to start the tasks: 
> {noformat}
> java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
> "inputMap" is null
>   at org.apache.kafka.common.utils.Utils.castToStringObjectMap
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.connect.runtime.TaskConfig.
>   at org.apache.kafka.connect.runtime.Worker.startTask)
>   at org.apache.kafka.connect.runtime.Worker.startSourceTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
>   at java.base/java.util.concurrent.FutureTask.run
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
>   at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
>  
> [1] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
>   
> [2] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
> [3] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic

2024-10-29 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-17719:


Assignee: Daniel Urban

> Connect may fail to start tasks when reading from a compacted config topic
> --
>
> Key: KAFKA-17719
> URL: https://issues.apache.org/jira/browse/KAFKA-17719
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chaitanya Mukka
>Assignee: Daniel Urban
>Priority: Major
>
> The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
> alters the logic for materializing a view of the config topic to ignore task 
> configs when there is no configuration for that connector present earlier in 
> the config topic. However, the logic fails to consider topics that might get 
> compacted over time.
> In particular, when we have a connector {{C1}} running fine, the records in 
> the config topic for the connector will look something like {{{}C1, T1, T2, 
> Task-commit-record{}}}.
> If the connector gets a config update that doesn't produce any new task 
> configs (note that this is a valid case when there are no task config 
> changes[1]) we only produce a Connector config record [2]. The config topic 
> now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the 
> topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, 
> C1{}}}. This can be a common scenario in large and old connect clusters.
> Based on the changes for KAFKA-16838, when the connect worker reads this 
> config state it ignores the task configs [3] for this while the connector is 
> still active and we might have active assignments for the same. The symptom 
> of this issue is an NPE which shows up when trying to start the tasks: 
> {noformat}
> java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
> "inputMap" is null
>   at org.apache.kafka.common.utils.Utils.castToStringObjectMap
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.common.config.AbstractConfig.
>   at org.apache.kafka.connect.runtime.TaskConfig.
>   at org.apache.kafka.connect.runtime.Worker.startTask)
>   at org.apache.kafka.connect.runtime.Worker.startSourceTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
>   at java.base/java.util.concurrent.FutureTask.run
>   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
>   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
>   at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
>  
> [1] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
>   
> [2] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
> [3] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17632) Custom `partitioner.class` with an even number of partitions always writes to even partitions if use RoundRobinPartitioner

2024-10-28 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-17632:


Assignee: Daniel Urban

> Custom `partitioner.class` with an even number of partitions always writes to 
> even partitions if use RoundRobinPartitioner
> --
>
> Key: KAFKA-17632
> URL: https://issues.apache.org/jira/browse/KAFKA-17632
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: thanhlv
>Assignee: Daniel Urban
>Priority: Minor
> Attachments: image-2024-09-27-15-05-53-707.png
>
>
> Our project has some special logic that requires custom partitions.
> With an odd number of Partitions, everything works fine and well.
> However, with an even number of partitions. Data will only be written to 
> even-numbered Partition IDs
> Info:
> Lib Java: `kafka-clients:3.8.0`
> Code demo:
> {code:java}
> public class CustomLogicPartitionMain {
> public static void main(String[] args) throws IOException {
> System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "ALL");
> final var props = new Properties();
> props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "java-producer-producerRecordPartition-KeyNotNull");
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29091,localhost:29092,localhost:29093,localhost:29094");
> props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "5000");
> props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
> "org.apache.kafka.clients.producer.RoundRobinPartitioner");
> try (var producer = new KafkaProducer(props)) {
> final var messageProducerRecord = new ProducerRecord<>(
> "topic-rep-1-partition-10", //topic name
> // 36 byte
> UUID.randomUUID().toString()// value
> );
> for (int i = 1; i <= 5000; i++) {
> producer.send(messageProducerRecord);
> }
> }
> }
> }
>  {code}
> !image-2024-09-27-15-05-53-707.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17632) Custom `partitioner.class` with an even number of partitions always writes to even partitions if use RoundRobinPartitioner

2024-10-28 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893559#comment-17893559
 ] 

Daniel Urban commented on KAFKA-17632:
--

Thank you for the report and the repro code.

I'm unsure if this ever worked correctly, but in the current code, 
KafkaProducer calls partitioner.partition twice on new batches, this is the 
location of the 2nd call:

[https://github.com/apache/kafka/blob/14a9130f6fa31c10cb65cc500c101148d0410306/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1079]

Because of this, when you have an even number of partitions, you are basically 
"locked out" of half of your partitions when using RR.

RoundRobinPartitioner was implemented without taking the semantics of the 
onNewBatch method into account. I'll try to submit a fix for this - basically 
the onNewBatch can be handled as a "do not increment on the next call of 
partition" notification.

I think the only way to fix your custom partitioner implementation is to 
introduce similar logic in the onNewBatch callback. Since onNewBatch is 
deprecated, it will be removed in the future, and that will probably fix the 
logic in KafkaProducer. Due to backward compatibility, and other partitioner 
implementations relying on this double-call, I think we cannot really change 
the producer logic.

> Custom `partitioner.class` with an even number of partitions always writes to 
> even partitions if use RoundRobinPartitioner
> --
>
> Key: KAFKA-17632
> URL: https://issues.apache.org/jira/browse/KAFKA-17632
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: thanhlv
>Assignee: Daniel Urban
>Priority: Minor
> Attachments: image-2024-09-27-15-05-53-707.png
>
>
> Our project has some special logic that requires custom partitions.
> With an odd number of Partitions, everything works fine and well.
> However, with an even number of partitions. Data will only be written to 
> even-numbered Partition IDs
> Info:
> Lib Java: `kafka-clients:3.8.0`
> Code demo:
> {code:java}
> public class CustomLogicPartitionMain {
> public static void main(String[] args) throws IOException {
> System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "ALL");
> final var props = new Properties();
> props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "java-producer-producerRecordPartition-KeyNotNull");
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29091,localhost:29092,localhost:29093,localhost:29094");
> props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "5000");
> props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
> "org.apache.kafka.clients.producer.RoundRobinPartitioner");
> try (var producer = new KafkaProducer(props)) {
> final var messageProducerRecord = new ProducerRecord<>(
> "topic-rep-1-partition-10", //topic name
> // 36 byte
> UUID.randomUUID().toString()// value
> );
> for (int i = 1; i <= 5000; i++) {
> producer.send(messageProducerRecord);
> }
> }
> }
> }
>  {code}
> !image-2024-09-27-15-05-53-707.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17828) Reverse Checkpointing in MirrorMaker2

2024-10-18 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-17828:


 Summary: Reverse Checkpointing in MirrorMaker2
 Key: KAFKA-17828
 URL: https://issues.apache.org/jira/browse/KAFKA-17828
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 supports checkpointing - replicating the committed offsets of groups across 
Kafka clusters.

This is a one-way replication, meaning that consumers can rely on this when 
they fail over from the upstream cluster to the downstream cluster. But 
checkpointing would be desirable in failbacks, too: if the consumers processed 
messages from the downstream topic, they do not want to consume the same 
messages again in the upstream topic.

To avoid this, checkpointing should also support reverse checkpointing in the 
context of a bidirectional replication: creating checkpoints between 
downstream->upstream topics.

This ticket implements KIP-1098.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17534) Allow disabling hearbeats topic replication in MirrorSourceConnector

2024-09-12 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-17534:
-
Description: 
Currently, MirrorSourceConnector always replicates the heartbeats topics. In 
some use-cases (e.g. multiple parallel, mutually exclusive 
MirrorSourceConnector instances in the same replication), users need to disable 
this behavior.

Add a new property to allow disabling the heartbeats replication.

This ticket implements KIP-1089.

  was:
Currently, MirrorSourceConnector always replicates the heartbeats topics. In 
some use-cases (e.g. multiple parallel, mutually exclusive 
MirrorSourceConnector instances in the same replication), users need to disable 
this behavior.

Add a new property to allow disabling the heartbeats replication.


> Allow disabling hearbeats topic replication in MirrorSourceConnector
> 
>
> Key: KAFKA-17534
> URL: https://issues.apache.org/jira/browse/KAFKA-17534
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> Currently, MirrorSourceConnector always replicates the heartbeats topics. In 
> some use-cases (e.g. multiple parallel, mutually exclusive 
> MirrorSourceConnector instances in the same replication), users need to 
> disable this behavior.
> Add a new property to allow disabling the heartbeats replication.
> This ticket implements KIP-1089.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17534) Allow disabling hearbeats topic replication in MirrorSourceConnector

2024-09-12 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-17534:


 Summary: Allow disabling hearbeats topic replication in 
MirrorSourceConnector
 Key: KAFKA-17534
 URL: https://issues.apache.org/jira/browse/KAFKA-17534
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban
Assignee: Daniel Urban


Currently, MirrorSourceConnector always replicates the heartbeats topics. In 
some use-cases (e.g. multiple parallel, mutually exclusive 
MirrorSourceConnector instances in the same replication), users need to disable 
this behavior.

Add a new property to allow disabling the heartbeats replication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13459) MM2 should be able to add the source offset to the record header

2024-04-23 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839981#comment-17839981
 ] 

Daniel Urban commented on KAFKA-13459:
--

[~viktorsomogyi]  No plans, and it does require a KIP.

> MM2 should be able to add the source offset to the record header
> 
>
> Key: KAFKA-13459
> URL: https://issues.apache.org/jira/browse/KAFKA-13459
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> MM2 could add the source offset to the record header to help with diagnostics 
> in some use-cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-08-18 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755997#comment-17755997
 ] 

Daniel Urban edited comment on KAFKA-15372 at 8/18/23 2:33 PM:
---

[~gharris1727] I have a bit more information: we managed to deterministically 
reproduce the issue if we use a 2 instance cluster, and make a change in the 
config, then wait a long time between the instance restarts. In our specific 
test we wait 3 minutes after stopping the first MM2 instance, then wait 3 
minutes after starting the first MM2 instance, then wait 3 minutes after 
stopping the second MM2 instance, and so on.

At the end of this process, the config change does not get applied.

I think this all depends on the rebalance - if the rebalance finishes before 
the MM2 instance bounces back, the leadership will always move to the other 
node.

Again, all of this is tested on a 3.4.1 build which has the MM2 internal REST 
enabled, but I'm pretty sure that trunk is affected.


was (Author: durban):
[~gharris1727] I have a bit more information: we managed to deterministically 
reproduce the issue if we use a 2 instance cluster, and make a change in the 
config, then wait a long time between the instance restarts. In our specific 
test we wait 3 minutes after stopping the first MM2 instance, then wait 3 
minutes after starting the first MM2 instance, then wait 3 minutes after 
stopping the second MM2 instance, and so on.

At the end of this process, the config change does not get applied.

I think this all depends on the rebalance - if the rebalance finishes before 
the MM2 instance bounces back, the leadership will always move to the other 
node.

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Priority: Major
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-08-18 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755997#comment-17755997
 ] 

Daniel Urban commented on KAFKA-15372:
--

[~gharris1727] I have a bit more information: we managed to deterministically 
reproduce the issue if we use a 2 instance cluster, and make a change in the 
config, then wait a long time between the instance restarts. In our specific 
test we wait 3 minutes after stopping the first MM2 instance, then wait 3 
minutes after starting the first MM2 instance, then wait 3 minutes after 
stopping the second MM2 instance, and so on.

At the end of this process, the config change does not get applied.

I think this all depends on the rebalance - if the rebalance finishes before 
the MM2 instance bounces back, the leadership will always move to the other 
node.

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Priority: Major
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-08-18 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755833#comment-17755833
 ] 

Daniel Urban commented on KAFKA-15372:
--

[~gharris1727]  Not sure if I follow this part: "should forward configurations 
to the leader via the internal REST API."

I checked org.apache.kafka.connect.mirror.MirrorMaker#configureConnector which 
then calls 
org.apache.kafka.connect.runtime.distributed.DistributedHerder#putConnectorConfig,
 and I don't really see any sign of forwarding to the leader. The callback of 
the validation explicitly handles the non-leader state with a failure:
{code:java}
if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can set 
connector configs.", leaderUrl()), null);
return null;
} {code}
So I think that current trunk is also affected by this, there is no Connector 
configuration forwarding to the leader in MM2. Additionally, I'm not sure if a 
single forward attempt is enough to ensure correctness, but that is an 
implementation detail.

Unfortunately, I really don't have an exact reproduction, but I saw this 
happening in an actual cluster, the leadership changes occurred as I detailed 
in the ticket description.

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Priority: Major
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-08-17 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755643#comment-17755643
 ] 

Daniel Urban commented on KAFKA-15372:
--

Hi [~gharris1727], I don't have a deterministic reproduction of the issue.

The reproduction of the problem requires multiple MM2 instances, but the 
internal REST is not needed at all (the startup and Connector config update 
does not touch the REST).

Encountered this on a 3.4 build which contains the MM2 internal REST feature.

> MM2 rolling restart can drop configuration changes silently
> ---
>
> Key: KAFKA-15372
> URL: https://issues.apache.org/jira/browse/KAFKA-15372
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Priority: Major
>
> When MM2 is restarted, it tries to update the Connector configuration in all 
> flows. This is a one-time trial, and fails if the Connect worker is not the 
> leader of the group.
> In a distributed setup and with a rolling restart, it is possible that for a 
> specific flow, the Connect worker of the just restarted MM2 instance is not 
> the leader, meaning that Connector configurations can get dropped.
> For example, assuming 2 MM2 instances, and one flow A->B:
>  # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
> leader of A->B Connect group.
>  # MM2 instance 1 tries to update the Connector configurations, but fails 
> (instance 2 has the leader, not instance 1)
>  # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
>  # MM2 instance 2 tries to update the Connector configurations, but fails
> At this point, the configuration changes before the restart are never 
> applied. Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-08-17 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-15372:


 Summary: MM2 rolling restart can drop configuration changes 
silently
 Key: KAFKA-15372
 URL: https://issues.apache.org/jira/browse/KAFKA-15372
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


When MM2 is restarted, it tries to update the Connector configuration in all 
flows. This is a one-time trial, and fails if the Connect worker is not the 
leader of the group.

In a distributed setup and with a rolling restart, it is possible that for a 
specific flow, the Connect worker of the just restarted MM2 instance is not the 
leader, meaning that Connector configurations can get dropped.

For example, assuming 2 MM2 instances, and one flow A->B:
 # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
leader of A->B Connect group.
 # MM2 instance 1 tries to update the Connector configurations, but fails 
(instance 2 has the leader, not instance 1)
 # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
 # MM2 instance 2 tries to update the Connector configurations, but fails

At this point, the configuration changes before the restart are never applied. 
Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions

2023-06-20 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735562#comment-17735562
 ] 

Daniel Urban commented on KAFKA-15075:
--

[~elkkhan] Not in the near future, feel free to pick it up.

> MM2 internal checkpoints topic should support multiple partitions
> -
>
> Key: KAFKA-15075
> URL: https://issues.apache.org/jira/browse/KAFKA-15075
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, the internal checkpoints topic of MM2 uses a single partition.
> This is an unnecessary limitation, and instead, it should support more 
> partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions

2023-06-09 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-15075:


 Summary: MM2 internal checkpoints topic should support multiple 
partitions
 Key: KAFKA-15075
 URL: https://issues.apache.org/jira/browse/KAFKA-15075
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


Currently, the internal checkpoints topic of MM2 uses a single partition.

This is an unnecessary limitation, and instead, it should support more 
partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class

2023-06-05 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-13756:


Assignee: Daniel Urban

> Connect validate endpoint should return proper response for invalid connector 
> class
> ---
>
> Key: KAFKA-13756
> URL: https://issues.apache.org/jira/browse/KAFKA-13756
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> Currently, if there is an issue with  the connector class, the validate 
> endpoint returns a 400 or a 500 response.
> Instead, it should return a well formatted response containing a proper 
> validation error message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14497) LastStableOffset is advanced prematurely when a log is reopened.

2023-06-02 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728723#comment-17728723
 ] 

Daniel Urban commented on KAFKA-14497:
--

AFAIU, the information about the replicated state of a transaction is not 
stored in the snapshot at all. I think the data stored in the snapshot file 
needs to be extended with the extra information whether the completed 
transaction is replicated.

By the time ProducerStateManager#completeTxn is called (which puts the 
transaction into ProducerStateManager.unreplicatedTxns), the producer entry is 
already cleared (ProducerAppendInfo#appendEndTxnMarker - currentTxnFirstOffset 
is empty, indicating that there is no pending transaction). If a snapshot is 
created at this point, and then the snapshot is loaded, there is no way to 
differentiate between replicated and unreplicated transactions.

Instead, ProducerAppendInfo#appendEndTxnMarker should also set a flag showing 
that while the transaction is complete, it might still be unreplicated. Then, 
when ProducerStateManager#removeUnreplicatedTransactions is called, the flag in 
the producer entry can be cleared.

This way the snapshot would contain the full data, and we could also recover 
the state of unreplicatedTxns.

[~hachikuji] wdyt about this approach? If it seems okay, I can take a look into 
this and submit a PR.

> LastStableOffset is advanced prematurely when a log is reopened.
> 
>
> Key: KAFKA-14497
> URL: https://issues.apache.org/jira/browse/KAFKA-14497
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vincent Jiang
>Priority: Major
>
> In below test case, last stable offset of log is advanced prematurely after 
> reopen:
>  # producer #1 appends transaction records to leader. offsets = [0, 1, 2, 3]
>  # producer #2 appends transactional records to leader. offsets =  [4, 5, 6, 
> 7]
>  # all records are replicated to followers and high watermark advanced to 8.
>  # at this point, lastStableOffset = 0. (first offset of an open transaction)
>  # producer #1 aborts the transaction by writing an abort marker at offset 8. 
>  ProducerStateManager.unreplicatedTxns contains the aborted transaction 
> (firstOffset=0, lastOffset=8)
>  # then the log is closed and reopened.
>  # after reopen, log.lastStableOffset is initialized to 4.  This is because 
> ProducerStateManager.unreplicatedTxns is empty after reopening log.
>  
> We should rebuild ProducerStateManager.unreplicatedTxns when reloading a log, 
> so that lastStableOffset remains unchanged before and after reopen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14034) Consistency violation: enabled idempotency doesn't prevent duplicates when a client runs into UNKNOWN_SERVER_ERROR

2023-06-02 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-14034:


Assignee: Daniel Urban

> Consistency violation: enabled idempotency doesn't prevent duplicates when a 
> client runs into UNKNOWN_SERVER_ERROR
> --
>
> Key: KAFKA-14034
> URL: https://issues.apache.org/jira/browse/KAFKA-14034
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0, 3.1.1
>Reporter: Denis Rystsov
>Assignee: Daniel Urban
>Priority: Major
>
> Hey folks, I've observed duplicated records in the log while idempotency was 
> enabled and it looks like the kafka client is the culprit. I've tested on 
> 3.0.0 but the tip of the kafka repo is also affected
> Let a user sends two produce requests without async so there is two inflight 
> requests
> {code:java}
> producer.send(A)
> producer.send(B){code}
> Let the first request results with a retry-able error after it was written to 
> disk and let the second request results with UNKNOWN_SERVER_ERROR. Any 
> unhandled exception on the broker side results in UNKNOWN_SERVER_ERROR so it 
> may happen.
> Since request A is retry-able it is put into the outbound queue there - 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L623]
> Let B's UNKNOWN_SERVER_ERROR is received before A is retried. It is being 
> processed in the following methods:
>  * 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642]
>  * 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L742]
>  * 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L761]
>  * 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L624]
>  * maybeTransitionToErrorState doesn't consider UNKNOWN_SERVER_ERROR fatal so 
> it doesn't mark the request as such: 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L611]
>  * as result handleFailedBatch requests epoch bump 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L652]
> When epoch is bumped it rewrites sequence numbers of the inflight requests: 
> [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L481]
> In our case it rewrites A's sequence numbers and when the request is retried 
> the broker can't dedupe it and writes it to the log thus violating the 
> idempotency guarantees.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics

2023-05-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14978:


 Summary: ExactlyOnceWorkerSourceTask does not remove parent metrics
 Key: KAFKA-14978
 URL: https://issues.apache.org/jira/browse/KAFKA-14978
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Daniel Urban
Assignee: Daniel Urban


ExactlyOnceWorkerSourceTask removeMetrics does not invoke super.removeMetrics, 
meaning that only the transactional metrics are removed, and common source task 
metrics are not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context (KIP-916)

2023-04-21 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14652:
-
Summary: Improve MM2 logging by adding the flow information to the context 
(KIP-916)  (was: Improve MM2 logging by adding the flow information to the 
context)

> Improve MM2 logging by adding the flow information to the context (KIP-916)
> ---
>
> Key: KAFKA-14652
> URL: https://issues.apache.org/jira/browse/KAFKA-14652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MirrorMaker2 runs multiple Connect worker instances in a single process. In 
> Connect, the logging is based on the assumption that Connector names are 
> unique. But in MM2, the same Connector names are being used in each flow 
> (Connect group). This means that there is no way to differentiate between the 
> logs of MirrorSourceConnector in A->B and in B->A.
> This can be improved by adding the flow to the logging context and the names 
> of the Connect framework threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups

2023-04-19 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714097#comment-17714097
 ] 

Daniel Urban commented on KAFKA-14807:
--

[~fisher91] do you use MM2 dedicated mode, or use the Connectors directly in a 
Connect cluster?

If the latter, you can fix this by only passing 
source.consumer.auto.offset.reset=latest to MirrorSourceConnector, but not to 
the MirrorCheckpointConnector.

For MM2 dedicated mode, I'm not aware of any workarounds.

> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> ---
>
> Key: KAFKA-14807
> URL: https://issues.apache.org/jira/browse/KAFKA-14807
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0, 3.3.1, 3.3.2
> Environment: centos7
>Reporter: Zhaoli
>Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the 
> Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to 
> avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped 
> replicating.
> The common characteristic of these consumer groups is their EMPTY status, 
> which means they have no active members at that moment. All the active 
> consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration 
> above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
> Optional offsetSync = latestOffsetSync(sourceTopicPartition);
> if (offsetSync.isPresent()) {
> if (offsetSync.get().upstreamOffset() > upstreamOffset) {
> // Offset is too far in the past to translate accurately
> return OptionalLong.of(-1L);
> }
> long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
> return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> upstreamStep);
> } else {
> return OptionalLong.empty();
> }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14903) MM2 Topic And Group Listener (KIP-918)

2023-04-13 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14903:


 Summary: MM2 Topic And Group Listener (KIP-918)
 Key: KAFKA-14903
 URL: https://issues.apache.org/jira/browse/KAFKA-14903
 Project: Kafka
  Issue Type: New Feature
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 has a dynamic topic and group filter mechanism, in which the replicated 
topics/groups can dynamically change, either due to changes in the available 
topics/groups, or changes in the filter settings.

In order to monitor the currently replicated topics/groups, MM2 should support 
a TopicListener and GroupListener plugin, which is triggered when MM2 changes 
the set of replicated topics/groups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14902) KafkaBasedLog infinite retries can lead to StackOverflowError

2023-04-13 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14902:


 Summary: KafkaBasedLog infinite retries can lead to 
StackOverflowError
 Key: KAFKA-14902
 URL: https://issues.apache.org/jira/browse/KAFKA-14902
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Daniel Urban
Assignee: Daniel Urban


KafkaBasedLog subclasses use an infinite retry on producer sends, using a 
callback. Sometimes, when specific errors are encountered, the callback is 
invoked in the send call, on the calling thread. If this happens enough times, 
a stack overflow happens.

Example stacktrace from 2.5 (but the newest code can also encounter the same):
{code:java}
2023-01-14 12:48:23,487 ERROR org.apache.kafka.connect.runtime.WorkerTask: 
WorkerSourceTask{id=MirrorSourceConnector-1} Task threw an uncaught and 
unrecoverable exception java.lang.StackOverflowError: null at 
org.apache.kafka.common.metrics.stats.SampledStat.record(SampledStat.java:50) 
at org.apache.kafka.common.metrics.stats.Rate.record(Rate.java:60) at 
org.apache.kafka.common.metrics.stats.Meter.record(Meter.java:80) at 
org.apache.kafka.common.metrics.Sensor.record(Sensor.java:188) at 
org.apache.kafka.common.metrics.Sensor.record(Sensor.java:178) at 
org.apache.kafka.clients.producer.internals.BufferPool.recordWaitTime(BufferPool.java:202)
 at 
org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:147)
 at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:221)
 at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:941) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) 
at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298)
 ... at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:959) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) 
at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298){code}
Note the repeated KafkaProducer.send -> KafkaProducer.doSend -> 
KafkaStatusBackingStore$4.onCompletion calls, causing the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context

2023-03-27 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14652:
-
Description: 
MirrorMaker2 runs multiple Connect worker instances in a single process. In 
Connect, the logging is based on the assumption that Connector names are 
unique. But in MM2, the same Connector names are being used in each flow 
(Connect group). This means that there is no way to differentiate between the 
logs of MirrorSourceConnector in A->B and in B->A.

This can be improved by adding the flow to the logging context and the names of 
the Connect framework threads.

  was:
MirrorMaker2 runs multiple Connect worker instances in a single process. In 
Connect, the logging is based on the assumption that Connector names are 
unique. But in MM2, the same Connector names are being used in each flow 
(Connect group). This means that there is no way to differentiate between the 
logs of MirrorSourceConnector in A->B and in B->A.

This can be improved by adding the flow to the logging context.


> Improve MM2 logging by adding the flow information to the context
> -
>
> Key: KAFKA-14652
> URL: https://issues.apache.org/jira/browse/KAFKA-14652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MirrorMaker2 runs multiple Connect worker instances in a single process. In 
> Connect, the logging is based on the assumption that Connector names are 
> unique. But in MM2, the same Connector names are being used in each flow 
> (Connect group). This means that there is no way to differentiate between the 
> logs of MirrorSourceConnector in A->B and in B->A.
> This can be improved by adding the flow to the logging context and the names 
> of the Connect framework threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role

2023-03-27 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14838:
-
Description: 
MM2 code creates a lot of Kafka clients internally. These clients generate a 
lot of logs, but since the client.id is not properly specified, connecting the 
dots between a specific Connector/Task and its internal client is close to 
impossible. This is even more complex when MM2 is running in distributed mode, 
in which multiple Connect workers are running inside the same process.

For Connector/Task created clients, the client.id  clients should specify the 
flow, the Connector name/Task ID and the role of the client. E.g. 
MirrorSourceConnector uses multiple admin clients, and their client.id should 
reflect the difference between them.

For Worker created clients, the client.id should refer to the flow.

This will help log analysis significantly, especially in MM2 mode.

  was:
MM2 code creates a lot of Kafka clients internally. These clients generate a 
lot of logs, but since the client.id is not properly specified, connecting the 
dots between a specific Connector/Task and its internal client is close to 
impossible.

The client.id of such clients should specify the Connector name/Task ID, and 
should also specify the role of the client. E.g. MirrorSourceConnector uses 
multiple admin clients, and their client.id should reflect the difference 
between them. This will help log analysis significantly, especially in MM2 mode.


> MM2 Worker/Connector/Task clients should specify client ID based on flow and 
> role
> -
>
> Key: KAFKA-14838
> URL: https://issues.apache.org/jira/browse/KAFKA-14838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MM2 code creates a lot of Kafka clients internally. These clients generate a 
> lot of logs, but since the client.id is not properly specified, connecting 
> the dots between a specific Connector/Task and its internal client is close 
> to impossible. This is even more complex when MM2 is running in distributed 
> mode, in which multiple Connect workers are running inside the same process.
> For Connector/Task created clients, the client.id  clients should specify the 
> flow, the Connector name/Task ID and the role of the client. E.g. 
> MirrorSourceConnector uses multiple admin clients, and their client.id should 
> reflect the difference between them.
> For Worker created clients, the client.id should refer to the flow.
> This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role

2023-03-27 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14838:
-
Summary: MM2 Worker/Connector/Task clients should specify client ID based 
on flow and role  (was: MM2 Connector/Task clients should specify client ID 
based on ID and role)

> MM2 Worker/Connector/Task clients should specify client ID based on flow and 
> role
> -
>
> Key: KAFKA-14838
> URL: https://issues.apache.org/jira/browse/KAFKA-14838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MM2 code creates a lot of Kafka clients internally. These clients generate a 
> lot of logs, but since the client.id is not properly specified, connecting 
> the dots between a specific Connector/Task and its internal client is close 
> to impossible.
> The client.id of such clients should specify the Connector name/Task ID, and 
> should also specify the role of the client. E.g. MirrorSourceConnector uses 
> multiple admin clients, and their client.id should reflect the difference 
> between them. This will help log analysis significantly, especially in MM2 
> mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context

2023-03-23 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14652:
-
Description: 
MirrorMaker2 runs multiple Connect worker instances in a single process. In 
Connect, the logging is based on the assumption that Connector names are 
unique. But in MM2, the same Connector names are being used in each flow 
(Connect group). This means that there is no way to differentiate between the 
logs of MirrorSourceConnector in A->B and in B->A.

This can be improved by adding the flow to the logging context.

  was:
MirrorMaker2 runs multiple Connect worker instances in a single process. In 
Connect, the logging is based on the assumption that Connector names are 
unique. But in MM2, the same Connector names are being used in each flow 
(Connect group). This means that there is no way to differentiate between the 
logs of MirrorSourceConnector in A->B and in B->A.

This can be improved by adding the flow to the logging context.

Additionally, the client.id of the Kafka clients used by the MM2 Connectors 
should also be set explicitly, with the flow information added.


> Improve MM2 logging by adding the flow information to the context
> -
>
> Key: KAFKA-14652
> URL: https://issues.apache.org/jira/browse/KAFKA-14652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MirrorMaker2 runs multiple Connect worker instances in a single process. In 
> Connect, the logging is based on the assumption that Connector names are 
> unique. But in MM2, the same Connector names are being used in each flow 
> (Connect group). This means that there is no way to differentiate between the 
> logs of MirrorSourceConnector in A->B and in B->A.
> This can be improved by adding the flow to the logging context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14838) MM2 Connector/Task clients should specify client ID based on ID and role

2023-03-23 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14838:


 Summary: MM2 Connector/Task clients should specify client ID based 
on ID and role
 Key: KAFKA-14838
 URL: https://issues.apache.org/jira/browse/KAFKA-14838
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 code creates a lot of Kafka clients internally. These clients generate a 
lot of logs, but since the client.id is not properly specified, connecting the 
dots between a specific Connector/Task and its internal client is close to 
impossible.

The client.id of such clients should specify the Connector name/Task ID, and 
should also specify the role of the client. E.g. MirrorSourceConnector uses 
multiple admin clients, and their client.id should reflect the difference 
between them. This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context

2023-03-23 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-14652:


Assignee: Daniel Urban

> Improve MM2 logging by adding the flow information to the context
> -
>
> Key: KAFKA-14652
> URL: https://issues.apache.org/jira/browse/KAFKA-14652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MirrorMaker2 runs multiple Connect worker instances in a single process. In 
> Connect, the logging is based on the assumption that Connector names are 
> unique. But in MM2, the same Connector names are being used in each flow 
> (Connect group). This means that there is no way to differentiate between the 
> logs of MirrorSourceConnector in A->B and in B->A.
> This can be improved by adding the flow to the logging context.
> Additionally, the client.id of the Kafka clients used by the MM2 Connectors 
> should also be set explicitly, with the flow information added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14721) Kafka listener uses wrong login class

2023-02-15 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14721:


 Summary: Kafka listener uses wrong login class
 Key: KAFKA-14721
 URL: https://issues.apache.org/jira/browse/KAFKA-14721
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.2
Reporter: Daniel Urban


When trying to configure a single SASL_SSL listener with GSSAPI, Scram and 
OAuth support, we encounter an error at startup:
{code:java}
2023-02-15 13:26:04,250 ERROR kafka.server.KafkaServer: [main]: [KafkaServer 
id=104] Fatal error during KafkaServer startup. Prepare to shutdown
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No 
serviceName defined in either JAAS or Kafka config
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.Processor.(SocketServer.scala:861) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) 
~[scala-library-2.13.10.jar:?]
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) 
~[scala-library-2.13.10.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) 
~[scala-library-2.13.10.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933) 
~[scala-library-2.13.10.jar:?]
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.SocketServer.startup(SocketServer.scala:131) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.server.KafkaServer.startup(KafkaServer.scala:310) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.Kafka$.main(Kafka.scala:109) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1(Kafka.scala:107) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1$adapted(Kafka.scala:107) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:118) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:110) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either 
JAAS or Kafka config
        at 
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:309)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:61)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        ... 21 more{code}
Using the following configs in a Kafka broker:

jaas configuration file:
{code:java}
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true 
useKeyTab=true storeKey=true serviceName="kafka" 
keyTab="/var/KAFKA_BROKER/kafka.keytab" principal="kafka/hgiovr@SITE";
org.apache.kafka.common.security.scram.ScramLoginModule required;
};{code}
and the following properties:
{code:java}
listener.name.sasl_ssl.sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required;
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuth

[jira] [Commented] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-15 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688979#comment-17688979
 ] 

Daniel Urban commented on KAFKA-14716:
--

[~ChrisEgerton] Indeed, it is, thanks for pointing that out - will try to 
comment on the PR of KAFKA-12694

> Connect schema does not allow struct default values
> ---
>
> Key: KAFKA-14716
> URL: https://issues.apache.org/jira/browse/KAFKA-14716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> The ConnectSchema API should allow specifying a composite (struct) default 
> value for a field, but with the current API, it is impossible to do so.
>  # There is a circular dependency between creating a struct as a default 
> value and creating the schema which holds it as the default value. The Struct 
> constructor expects a Schema object, and the default value setter of 
> SchemaBuilder checks schema conformity by using the 
> ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This 
> can only be bypassed if the struct references a SchemaBuilder instance, and 
> defaultValue is called on that builder instance, but this goes against the 
> Struct docs stating that "Struct objects must specify a complete \{@link 
> Schema} up front".
>  # ConnectSchema.equals is not prepared to be used with other Schema 
> implementations, so equals checks between ConnectSchema and SchemaBuilder 
> instances will always fail. This is only causing an issue if equals has to be 
> used for schema conformity checks.
> Code examples:
> Working code (mind that the schema referenced by the Struct is a 
> SchemaBuilder, and it is mutated after the Struct is constructed):
> {code:java}
> @Test
> public void testCompositeDefault() {
> SchemaBuilder nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA);
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> nestedSchema
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> } {code}
> Not working code (but better aligned with the current API and docs - 2 
> separate Schema instances used by the Struct and the field, only diff is the 
> default value between the 2):
> {code:java}
>  @Test
> public void testCompositeDefault() {
> Schema nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA)
> .build();
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> SchemaBuilder
> .struct()
> .field("bar", Schema.STRING_SCHEMA)
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-15 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-14716.
--
Resolution: Duplicate

> Connect schema does not allow struct default values
> ---
>
> Key: KAFKA-14716
> URL: https://issues.apache.org/jira/browse/KAFKA-14716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> The ConnectSchema API should allow specifying a composite (struct) default 
> value for a field, but with the current API, it is impossible to do so.
>  # There is a circular dependency between creating a struct as a default 
> value and creating the schema which holds it as the default value. The Struct 
> constructor expects a Schema object, and the default value setter of 
> SchemaBuilder checks schema conformity by using the 
> ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This 
> can only be bypassed if the struct references a SchemaBuilder instance, and 
> defaultValue is called on that builder instance, but this goes against the 
> Struct docs stating that "Struct objects must specify a complete \{@link 
> Schema} up front".
>  # ConnectSchema.equals is not prepared to be used with other Schema 
> implementations, so equals checks between ConnectSchema and SchemaBuilder 
> instances will always fail. This is only causing an issue if equals has to be 
> used for schema conformity checks.
> Code examples:
> Working code (mind that the schema referenced by the Struct is a 
> SchemaBuilder, and it is mutated after the Struct is constructed):
> {code:java}
> @Test
> public void testCompositeDefault() {
> SchemaBuilder nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA);
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> nestedSchema
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> } {code}
> Not working code (but better aligned with the current API and docs - 2 
> separate Schema instances used by the Struct and the field, only diff is the 
> default value between the 2):
> {code:java}
>  @Test
> public void testCompositeDefault() {
> Schema nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA)
> .build();
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> SchemaBuilder
> .struct()
> .field("bar", Schema.STRING_SCHEMA)
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-14 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14716:
-
Description: 
The ConnectSchema API should allow specifying a composite (struct) default 
value for a field, but with the current API, it is impossible to do so.
 # There is a circular dependency between creating a struct as a default value 
and creating the schema which holds it as the default value. The Struct 
constructor expects a Schema object, and the default value setter of 
SchemaBuilder checks schema conformity by using the 
ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can 
only be bypassed if the struct references a SchemaBuilder instance, and 
defaultValue is called on that builder instance, but this goes against the 
Struct docs stating that "Struct objects must specify a complete \{@link 
Schema} up front".
 # ConnectSchema.equals is not prepared to be used with other Schema 
implementations, so equals checks between ConnectSchema and SchemaBuilder 
instances will always fail. This is only causing an issue if equals has to be 
used for schema conformity checks.

Code examples:

Working code (mind that the schema referenced by the Struct is a SchemaBuilder, 
and it is mutated after the Struct is constructed):
{code:java}
@Test
public void testCompositeDefault() {
SchemaBuilder nestedSchema = SchemaBuilder.struct()
.field("bar", Schema.STRING_SCHEMA);
Struct nestedDefault = new Struct(nestedSchema);
nestedDefault.put("bar", "default_value");

Schema schema = SchemaBuilder.struct()
.field("foo",
nestedSchema
.defaultValue(nestedDefault)
.build()
)
.build();
} {code}
Not working code (but better aligned with the current API and docs - 2 separate 
Schema instances used by the Struct and the field, only diff is the default 
value between the 2):
{code:java}
 @Test
public void testCompositeDefault() {
Schema nestedSchema = SchemaBuilder.struct()
.field("bar", Schema.STRING_SCHEMA)
.build();
Struct nestedDefault = new Struct(nestedSchema);
nestedDefault.put("bar", "default_value");

Schema schema = SchemaBuilder.struct()
.field("foo",
SchemaBuilder
.struct()
.field("bar", Schema.STRING_SCHEMA)
.defaultValue(nestedDefault)
.build()
)
.build();
}{code}

  was:
The ConnectSchema API should allow specifying a composite (struct) default 
value for a field, but with the current API, it is impossible to do so.
 # There is a circular dependency between creating a struct as a default value 
and creating the schema which holds it as the default value. The Struct 
constructor expects a Schema object, and the default value setter of 
SchemaBuilder checks schema conformity by using the 
ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can 
only be bypassed if the struct references a SchemaBuilder instance, and 
defaultValue is called on that builder instance, but this goes against the 
Struct docs stating that "Struct objects must specify a complete \{@link 
Schema} up front".
 # ConnectSchema.equals is not prepared to be used with other Schema 
implementations, so equals checks between ConnectSchema and SchemaBuilder 
instances will always fail. This is only causing an issue if equals has to be 
used for schema conformity checks.


> Connect schema does not allow struct default values
> ---
>
> Key: KAFKA-14716
> URL: https://issues.apache.org/jira/browse/KAFKA-14716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> The ConnectSchema API should allow specifying a composite (struct) default 
> value for a field, but with the current API, it is impossible to do so.
>  # There is a circular dependency between creating a struct as a default 
> value and creating the schema which holds it as the default value. The Struct 
> constructor expects a Schema object, and the default value setter of 
> SchemaBuilder checks schema conformity by using the 
> ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This 
> can only be bypassed if the struct references a SchemaBuilder instance, and 
> defaultValue is called on that builder instance, but this goes against the 
> Struct docs stating that "Struct objects must specify a complete \{@link 
> Schema} up front".
>  # ConnectSchema.equals is not prepared to be used with other Schema 
> implementations, so equals checks between ConnectSchema and SchemaBuilder 
> instances will a

[jira] [Created] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-14 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14716:


 Summary: Connect schema does not allow struct default values
 Key: KAFKA-14716
 URL: https://issues.apache.org/jira/browse/KAFKA-14716
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban
Assignee: Daniel Urban


The ConnectSchema API should allow specifying a composite (struct) default 
value for a field, but with the current API, it is impossible to do so.
 # There is a circular dependency between creating a struct as a default value 
and creating the schema which holds it as the default value. The Struct 
constructor expects a Schema object, and the default value setter of 
SchemaBuilder checks schema conformity by using the 
ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can 
only be bypassed if the struct references a SchemaBuilder instance, and 
defaultValue is called on that builder instance, but this goes against the 
Struct docs stating that "Struct objects must specify a complete \{@link 
Schema} up front".
 # ConnectSchema.equals is not prepared to be used with other Schema 
implementations, so equals checks between ConnectSchema and SchemaBuilder 
instances will always fail. This is only causing an issue if equals has to be 
used for schema conformity checks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory

2023-02-01 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14667:
-
Description: 
This was observed with Kafka 3.1.1, but I believe that latest versions are also 
affected.

In the Cruise Control project, there is an integration test: 
com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle

On our infrastructure, this test fails every ~20th run with a timeout - the 
triggered preferred leadership election is never completed. After some 
investigation, it turns out that:
 # The admin client never gets a response from the broker.
 # The leadership change is executed successfully.
 # The ElectLeader purgatory never gets an update for the relevant topic 
partition.

A few relevant lines from a failed run (this test uses an embedded cluster, 
logs are mixed):

CC successfully sends a preferred election request to the controller (broker 
0), topic1-0 needs a leadership change from broker 0 to broker 1:
{code:java}
2023-02-01 01:20:26.028 [controller-event-thread] DEBUG 
kafka.controller.KafkaController - [Controller id=0] Waiting for any successful 
result for election type (PREFERRED) by AdminClientTriggered for partitions: 
Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, 
message=Leader election not needed for topic partition.)))
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1) {code}
The delayed operation for the leader election is triggered 2 times in quick 
succession (yes, same ms in both logs):
{code:java}
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1)
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1){code}
Shortly after (few ms later based on the logs), broker 0 receives an 
UpdateMetadataRequest from the controller (itself) and processes it:
{code:java}
2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Sending UPDATE_METADATA request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19) and timeout 3 to node 0: 
UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, 
ungroupedPartitionStates=[], 
topicStates=[UpdateMetadataTopicState(topicName='topic1', 
topicId=gkFP8VnkSGyEf_LBBZSowQ, 
partitionStates=[UpdateMetadataPartitionState(topicName='topic1', 
partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], 
zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], 
liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null), 
UpdateMetadataBroker(id=0, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null)])
2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Received UPDATE_METADATA response from node 0 for request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19): UpdateMetadataResponseData(errorCode=0)
2023-02-01 01:20:26.035 
[data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG 
kafka.request.logger - Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 {code}
The update metadata request should trigger an update on the ElectLeader 
purgatory, and we should see a log line like this: "Request key X unblocked Y 
ElectLeader."

In the failin

[jira] [Created] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory

2023-02-01 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14667:


 Summary: Delayed leader election operation gets stuck in purgatory
 Key: KAFKA-14667
 URL: https://issues.apache.org/jira/browse/KAFKA-14667
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.1
Reporter: Daniel Urban


This was observer with Kafka 3.1.1, but I believe that latest versions are also 
affected.

In the Cruise Control project, there is an integration test: 
com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle

On our infrastructure, this test fails every ~20th run with a timeout - the 
triggered preferred leadership election is never completed. After some 
investigation, it turns out that:
 # The admin client never gets a response from the broker.
 # The leadership change is executed successfully.
 # The ElectLeader purgatory never gets an update for the relevant topic 
partition.

A few relevant lines from a failed run (this test uses an embedded cluster, 
logs are mixed):

CC successfully sends a preferred election request to the controller (broker 
0), topic1-0 needs a leadership change from broker 0 to broker 1:
{code:java}
2023-02-01 01:20:26.028 [controller-event-thread] DEBUG 
kafka.controller.KafkaController - [Controller id=0] Waiting for any successful 
result for election type (PREFERRED) by AdminClientTriggered for partitions: 
Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, 
message=Leader election not needed for topic partition.)))
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1) {code}
The delayed operation for the leader election is triggered 2 times in quick 
succession (yes, same ms in both logs):
{code:java}
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1)
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1){code}
Shortly after (few ms later based on the logs), broker 0 receives an 
UpdateMetadataRequest from the controller (itself) and processes it:
{code:java}
2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Sending UPDATE_METADATA request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19) and timeout 3 to node 0: 
UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, 
ungroupedPartitionStates=[], 
topicStates=[UpdateMetadataTopicState(topicName='topic1', 
topicId=gkFP8VnkSGyEf_LBBZSowQ, 
partitionStates=[UpdateMetadataPartitionState(topicName='topic1', 
partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], 
zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], 
liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null), 
UpdateMetadataBroker(id=0, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null)])
2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Received UPDATE_METADATA response from node 0 for request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19): UpdateMetadataResponseData(errorCode=0)
2023-02-01 01:20:26.035 
[data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG 
kafka.request.logger - Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 {code}
The update metadata

[jira] [Created] (KAFKA-14653) MM2 should delay resolving config provider references

2023-01-25 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14653:


 Summary: MM2 should delay resolving config provider references
 Key: KAFKA-14653
 URL: https://issues.apache.org/jira/browse/KAFKA-14653
 Project: Kafka
  Issue Type: Sub-task
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 eagerly resolves config providers, meaning that the generated Connector 
configurations contain inline values.

This is a problem for sensitive and host-specific configurations, so MM2 should 
delay the resolution of such configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters

2023-01-25 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-10586:
-
Issue Type: Task  (was: Improvement)

> Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
> ---
>
> Key: KAFKA-10586
> URL: https://issues.apache.org/jira/browse/KAFKA-10586
> Project: Kafka
>  Issue Type: Task
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Chris Egerton
>Priority: Major
>  Labels: cloudera
>
> KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
> MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means 
> that with specific workloads, the dedicated MM2 cluster can become unable to 
> react to dynamic topic and group filter changes.
> (This occurs when after a rebalance operation, the leader node has no 
> MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
> stopped on the leader, meaning it cannot detect config changes by itself. 
> Followers still running the connector can detect config changes, but they 
> cannot query the leader for config updates.)
> Besides the REST support, config provider references should be evaluated 
> lazily in connector configurations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context

2023-01-25 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14652:


 Summary: Improve MM2 logging by adding the flow information to the 
context
 Key: KAFKA-14652
 URL: https://issues.apache.org/jira/browse/KAFKA-14652
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Urban


MirrorMaker2 runs multiple Connect worker instances in a single process. In 
Connect, the logging is based on the assumption that Connector names are 
unique. But in MM2, the same Connector names are being used in each flow 
(Connect group). This means that there is no way to differentiate between the 
logs of MirrorSourceConnector in A->B and in B->A.

This can be improved by adding the flow to the logging context.

Additionally, the client.id of the Kafka clients used by the MM2 Connectors 
should also be set explicitly, with the flow information added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-14 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566808#comment-17566808
 ] 

Daniel Urban commented on KAFKA-14053:
--

I understand that increasing the epoch on the client side is probably violating 
the contract in the protocol.

Refactored my change so the client side timeouts (both delivery and request 
timeout) will become fatal errors in transactional producers, resulting a last, 
best-effort epoch bump.

> Transactional producer should bump the epoch when a batch encounters delivery 
> timeout
> -
>
> Key: KAFKA-14053
> URL: https://issues.apache.org/jira/browse/KAFKA-14053
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> When a batch fails due to delivery timeout, it is possible that the batch is 
> still in-flight. Due to underlying infra issues, it is possible that an 
> EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
> batch is processed on the leader. This can cause transactional batches to be 
> appended to the log after the corresponding abort marker.
> This can cause the LSO to be infinitely blocked in the partition, or can even 
> violate processing guarantees, as the out-of-order batch can become part of 
> the next transaction.
> Because of this, the producer should skip aborting the partition, and bump 
> the epoch to fence the in-flight requests.
>  
> More detail can be found here: 
> [https://lists.apache.org/thread/8d2oblsjtdv7740glc37v79f0r7p99dp]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-08 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564222#comment-17564222
 ] 

Daniel Urban commented on KAFKA-14053:
--

About to submit a PR, but wanted to bring up a few more points:
 * When we bump in the middle of an ongoing transaction, the coordinator 
handles that as a fencing operation, and does not return a valid epoch - the 
coordinator assumes that a new producer was started with the same ID, and 
forces the InitProducerIDRequest to be retried. This will result in a 2nd bump, 
in which the epoch is returned.
 * If we go with the simplest solution, and bump once, the producer must 
transition into a fatal state. The producer cannot safely get a new epoch from 
the coordinator, as sending the original epoch multiple times will first result 
in CONCURRENT_TRANSACTIONS (while the ongoing transaction is aborted) then in 
PRODUCER_FENCED (when the coordinator realizes that the epoch was bumped 
already). It is not safe to send an empty InitProducerIDRequest, as we might 
fence a newer producer instance with that.
 * In my PR, I try to bump twice. First, with using the original epoch (which 
will eventually get a PRODUCER_FENCED), then I increase the epoch by 1 on the 
client side, and try again. Conceptually, this might sound wrong (the producer 
is using an epoch which wasn't returned by the coordinator), but I think that 
theoretically it is safe to do. The first bump during an ongoing transaction 
results in an epoch which is never returned to any producers. If we retry with 
the increased epoch, that won't collide with any other producers. If the bump 
with the increased epoch succeeds, then there was no concurrent bump from other 
instances, and the producer can continue using the new epoch. If the increased 
epoch receives a PRODUCER_FENCED, then the producer was actually fenced by 
another instance.
 * Due to the previous point, the producer needs to make sure that it can 
safely increase the epoch by 1 at any point. This means that if the producer 
got Short.MAX as the epoch, it won't be able to do the +1 increase locally. To 
avoid this, I made a change in the producer to force an epoch reset from the 
coordinator when the epoch is Short.MAX.
 * I understand that the local epoch increase and the intentional epoch reset 
might not be ideal, but without those, the producer cannot be safely used after 
a delivery timeout. So in my current understanding, a delivery timeout either 
means a fatal error, or we need to accept the local epoch increase and the 
epoch reset on Short.MAX.

> Transactional producer should bump the epoch when a batch encounters delivery 
> timeout
> -
>
> Key: KAFKA-14053
> URL: https://issues.apache.org/jira/browse/KAFKA-14053
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> When a batch fails due to delivery timeout, it is possible that the batch is 
> still in-flight. Due to underlying infra issues, it is possible that an 
> EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
> batch is processed on the leader. This can cause transactional batches to be 
> appended to the log after the corresponding abort marker.
> This can cause the LSO to be infinitely blocked in the partition, or can even 
> violate processing guarantees, as the out-of-order batch can become part of 
> the next transaction.
> Because of this, the producer should skip aborting the partition, and bump 
> the epoch to fence the in-flight requests.
>  
> More detail can be found here: 
> [https://lists.apache.org/thread/8d2oblsjtdv7740glc37v79f0r7p99dp]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-07 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14053:
-
Description: 
When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.

Because of this, the producer should skip aborting the partition, and bump the 
epoch to fence the in-flight requests.

  was:
When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.


> Transactional producer should bump the epoch when a batch encounters delivery 
> timeout
> -
>
> Key: KAFKA-14053
> URL: https://issues.apache.org/jira/browse/KAFKA-14053
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> When a batch fails due to delivery timeout, it is possible that the batch is 
> still in-flight. Due to underlying infra issues, it is possible that an 
> EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
> batch is processed on the leader. This can cause transactional batches to be 
> appended to the log after the corresponding abort marker.
> This can cause the LSO to be infinitely blocked in the partition, or can even 
> violate processing guarantees, as the out-of-order batch can become part of 
> the next transaction.
> Because of this, the producer should skip aborting the partition, and bump 
> the epoch to fence the in-flight requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-07 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14053:


 Summary: Transactional producer should bump the epoch when a batch 
encounters delivery timeout
 Key: KAFKA-14053
 URL: https://issues.apache.org/jira/browse/KAFKA-14053
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban
Assignee: Daniel Urban


When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException

2022-06-09 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552025#comment-17552025
 ] 

Daniel Urban commented on KAFKA-13970:
--

Sorry for the confusion, the Issue I reported depends on an unmerged PR which I 
thought was already merged. This can be closed.

> TopicAdmin topic creation should be retried on TimeoutException
> ---
>
> Key: KAFKA-13970
> URL: https://issues.apache.org/jira/browse/KAFKA-13970
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Assignee: Sagar Rao
>Priority: Major
>
> org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the 
> case when there aren't enough brokers in the cluster to create a topic with 
> the expected replication factor. This logic should also handle the case when 
> there are 0 brokers in the cluster, and should retry in that case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException

2022-06-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13970:


 Summary: TopicAdmin topic creation should be retried on 
TimeoutException
 Key: KAFKA-13970
 URL: https://issues.apache.org/jira/browse/KAFKA-13970
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Daniel Urban


org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the case 
when there aren't enough brokers in the cluster to create a topic with the 
expected replication factor. This logic should also handle the case when there 
are 0 brokers in the cluster, and should retry in that case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException

2022-06-08 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-13970:
-
Issue Type: Bug  (was: Improvement)

> TopicAdmin topic creation should be retried on TimeoutException
> ---
>
> Key: KAFKA-13970
> URL: https://issues.apache.org/jira/browse/KAFKA-13970
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Priority: Major
>
> org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the 
> case when there aren't enough brokers in the cluster to create a topic with 
> the expected replication factor. This logic should also handle the case when 
> there are 0 brokers in the cluster, and should retry in that case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2022-05-26 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542435#comment-17542435
 ] 

Daniel Urban commented on KAFKA-12671:
--

I think I encountered a different path than the one you describe:
 # InitProducerID with transactional ID
 # AddPartitionsToTxn
 # Produce, but with failure, retry is triggered
 # During a retry, delivery timeout is triggered
 # Abort sent to TransactionCoordinator
 # Abort marker sent by TransactionCoordinator
 # Last retry of the produce request is processed
 # Transactional ID is dropped and never used again

I'm unsure how 4-5 can happen - client was a Java client, and not too old 
(2.6). It is possible that there were some network errors in the environment 
when this occurred.

The result was that there were X batches produced in a specific transaction, 
but the last 2 batches were appended to the log after the abort marker. Also, 
the transaction coordinator wasn't aware of this, and handled it as a 
successful abort, and not having any open transaction for the specific 
transactional ID. Due to this, LSO was stuck in the partition, and transaction 
timeout was never triggered.

Also, if the transactional ID is reused, and somehow unblocks the partition by 
starting a new transaction (and using the same partition), it is still an 
issue, as it violates the processing guarantees. If the transactional ID is 
reused in the scenario which I described, and the first transaction after the 
out-of-order messages is committed, that transaction will contain 2 extra 
batches from the previous (aborted) transaction.

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Major
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 

[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2022-04-29 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530039#comment-17530039
 ] 

Daniel Urban commented on KAFKA-12671:
--

I believe I encountered this issue with the Java producer in a situation where 
the produce requests got timed out (due to delivery timeout), and the abort 
control message got appended to the partition before the last few batches sent 
in the transaction. [~twmb] Do you think it makes sense? A client side timeout 
would mean that the transaction manager inside the Java producer has no 
definitive answer about the fate of the produce request, and just assumes that 
it failed.

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Major
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [D

[jira] [Resolved] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-04-08 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-13809.
--
Resolution: Won't Fix

Connector configs are used for consumer/producer overrides, converter classes, 
etc.

Not necessary to propagate all configs to tasks.

> FileStreamSinkConnector and FileStreamSourceConnector should propagate full 
> configuration to tasks
> --
>
> Key: KAFKA-13809
> URL: https://issues.apache.org/jira/browse/KAFKA-13809
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Priority: Major
>
> The 2 example connectors do not propagate the full connector configuration to 
> the tasks. This makes it impossible to override built-in configs, such as 
> producer/consumer overrides.
> This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-04-08 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-13809:
-
Description: 
The 2 example connectors do not propagate the full connector configuration to 
the tasks. This makes it impossible to override built-in configs, such as 
producer/consumer overrides.

This causes an issue even when used for testing purposes.

  was:
The 2 example connectors do not propagate the full connector configuration to 
the tasks. This makes it impossible to override built-in configs, such as 
key.converter.

This causes an issue even when used for testing purposes.


> FileStreamSinkConnector and FileStreamSourceConnector should propagate full 
> configuration to tasks
> --
>
> Key: KAFKA-13809
> URL: https://issues.apache.org/jira/browse/KAFKA-13809
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Priority: Major
>
> The 2 example connectors do not propagate the full connector configuration to 
> the tasks. This makes it impossible to override built-in configs, such as 
> producer/consumer overrides.
> This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-04-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13809:


 Summary: FileStreamSinkConnector and FileStreamSourceConnector 
should propagate full configuration to tasks
 Key: KAFKA-13809
 URL: https://issues.apache.org/jira/browse/KAFKA-13809
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Daniel Urban


The 2 example connectors do not propagate the full connector configuration to 
the tasks. This makes it impossible to override built-in configs, such as 
key.converter.

This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class

2022-03-21 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-13756:
-
Description: 
Currently, if there is an issue with  the connector class, the validate 
endpoint returns a 400 or a 500 response.

Instead, it should return a well formatted response containing a proper 
validation error message.

  was:
Currently, if there is an issue with the connector name or the connector class, 
the validate endpoint returns a 500 response.

Instead, it should return a well formatted response containing proper 
validation error messages.


> Connect validate endpoint should return proper response for invalid connector 
> class
> ---
>
> Key: KAFKA-13756
> URL: https://issues.apache.org/jira/browse/KAFKA-13756
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, if there is an issue with  the connector class, the validate 
> endpoint returns a 400 or a 500 response.
> Instead, it should return a well formatted response containing a proper 
> validation error message.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class

2022-03-21 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-13756:
-
Summary: Connect validate endpoint should return proper response for 
invalid connector class  (was: Connect validate endpoint should return proper 
response on name and connector class error)

> Connect validate endpoint should return proper response for invalid connector 
> class
> ---
>
> Key: KAFKA-13756
> URL: https://issues.apache.org/jira/browse/KAFKA-13756
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, if there is an issue with the connector name or the connector 
> class, the validate endpoint returns a 500 response.
> Instead, it should return a well formatted response containing proper 
> validation error messages.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13756) Connect validate endpoint should return proper response on name and connector class error

2022-03-21 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13756:


 Summary: Connect validate endpoint should return proper response 
on name and connector class error
 Key: KAFKA-13756
 URL: https://issues.apache.org/jira/browse/KAFKA-13756
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Urban


Currently, if there is an issue with the connector name or the connector class, 
the validate endpoint returns a 500 response.

Instead, it should return a well formatted response containing proper 
validation error messages.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13459) MM2 should be able to add the source offset to the record header

2021-11-17 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13459:


 Summary: MM2 should be able to add the source offset to the record 
header
 Key: KAFKA-13459
 URL: https://issues.apache.org/jira/browse/KAFKA-13459
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


MM2 could add the source offset to the record header to help with diagnostics 
in some use-cases.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13452) MM2 creates invalid checkpoint when offset mapping is not available

2021-11-12 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13452:


 Summary: MM2 creates invalid checkpoint when offset mapping is not 
available
 Key: KAFKA-13452
 URL: https://issues.apache.org/jira/browse/KAFKA-13452
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


MM2 checkpointing reads the offset-syncs topic to create offset mappings for 
committed consumer group offsets. In some corner cases, it is possible that a 
mapping is not available in offset-syncs - in that case, MM2 simply copies the 
source offset, which might not be a valid offset in the replica topic at all.

One possible situation is if there is an empty topic in the source cluster with 
a non-zero endoffset (e.g. retention already removed the records), and a 
consumer group which has a committed offset set to the end offset. If 
replication is configured to start replicating this topic, it will not have an 
offset mapping available in offset-syncs (as the topic is empty), causing MM2 
to copy the source offset.

This can cause issues when auto offset sync is enabled, as the consumer group 
offset can be potentially set to a high number. MM2 never rewinds these 
offsets, so even when there is a correct offset mapping available, the offset 
will not be updated correctly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connect worker and to the Connector config

2021-10-11 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-13311:
-
Description: 
Currently, the configuration propagation logic in MM2 only allows a handful of 
configurations to be applied to all Connector configs managed by MM2.

In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
would be useful to be able to define configurations "globally", without 
prefixing the config with each replication.

E.g. the "connectors." prefix could be used to declare global Connector configs.

Similarly, only a handful of Connect worker configurations can be configured on 
the "global" level - there should be a prefix for global worker configs as well 
(e.g. "workers.")

  was:
Currently, the configuration propagation logic in MM2 only allows a handful of 
configurations to be applied to all Connector configs managed by MM2.

In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
would be useful to be able to define configurations "globally", without 
prefixing the config with each replication.

E.g. the "connectors." prefix could be used to declare global Connector configs.


> MM2 should allow propagating arbitrary global configurations to the Connect 
> worker and to the Connector config
> --
>
> Key: KAFKA-13311
> URL: https://issues.apache.org/jira/browse/KAFKA-13311
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.8.1
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, the configuration propagation logic in MM2 only allows a handful 
> of configurations to be applied to all Connector configs managed by MM2.
> In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
> would be useful to be able to define configurations "globally", without 
> prefixing the config with each replication.
> E.g. the "connectors." prefix could be used to declare global Connector 
> configs.
> Similarly, only a handful of Connect worker configurations can be configured 
> on the "global" level - there should be a prefix for global worker configs as 
> well (e.g. "workers.")



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connect worker and to the Connector config

2021-10-11 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-13311:
-
Summary: MM2 should allow propagating arbitrary global configurations to 
the Connect worker and to the Connector config  (was: MM2 should allow 
propagating arbitrary global configurations to the Connector config)

> MM2 should allow propagating arbitrary global configurations to the Connect 
> worker and to the Connector config
> --
>
> Key: KAFKA-13311
> URL: https://issues.apache.org/jira/browse/KAFKA-13311
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.8.1
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, the configuration propagation logic in MM2 only allows a handful 
> of configurations to be applied to all Connector configs managed by MM2.
> In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
> would be useful to be able to define configurations "globally", without 
> prefixing the config with each replication.
> E.g. the "connectors." prefix could be used to declare global Connector 
> configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connector config

2021-09-20 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13311:


 Summary: MM2 should allow propagating arbitrary global 
configurations to the Connector config
 Key: KAFKA-13311
 URL: https://issues.apache.org/jira/browse/KAFKA-13311
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.8.1
Reporter: Daniel Urban
Assignee: Daniel Urban


Currently, the configuration propagation logic in MM2 only allows a handful of 
configurations to be applied to all Connector configs managed by MM2.

In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
would be useful to be able to define configurations "globally", without 
prefixing the config with each replication.

E.g. the "connectors." prefix could be used to declare global Connector configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connector config

2021-09-20 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-13311:


Assignee: (was: Daniel Urban)

> MM2 should allow propagating arbitrary global configurations to the Connector 
> config
> 
>
> Key: KAFKA-13311
> URL: https://issues.apache.org/jira/browse/KAFKA-13311
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.8.1
>Reporter: Daniel Urban
>Priority: Major
>
> Currently, the configuration propagation logic in MM2 only allows a handful 
> of configurations to be applied to all Connector configs managed by MM2.
> In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
> would be useful to be able to define configurations "globally", without 
> prefixing the config with each replication.
> E.g. the "connectors." prefix could be used to declare global Connector 
> configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-13253.
--
Resolution: Duplicate

Same issue as KAFKA-9747 - that one already has a fix under review

> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the connector name contains some special characters, 
> such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
> uncatched exception is raised in RestClient and the forward is lost.
> Here is the kind of exception we can catch by adding the necessary code in 
> RestClient:
> {quote}java.lang.IllegalArgumentException: Illegal character in path at index 
> 51: 
> [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
> {quote}
> An additional catch() should be added in RestClient.httpRequest(), here:
> {quote}{{catch (IOException | InterruptedException | TimeoutException | 
> ExecutionException e) {}}
>     log.error("IO error forwarding REST request: ", e);
>     {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, 
> "IO Error trying to forward REST request: " + e.getMessage(), e);}}
> {{} finally {}}
> {quote}
> to catch all other exceptions because without, this kind of problem is 
> completly silent.
> To reproduce:
>  * start 2 kafka clusters
>  * start a kafka connect (distributed) with at least 2 nodes
>  * start an HeartbeatConnector with name "cluster1->cluster2"
> If the node which generated the task is not the leader (not systematic), it 
> will forward the creation to the leader and it will be lost. As a result, the 
> connector will stay in RUNNING state but without any task.
> Problem not easy to reproduce, it is important to start with empty connect 
> topics to reproduce more easily



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9747) No tasks created for a connector

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393655#comment-17393655
 ] 

Daniel Urban commented on KAFKA-9747:
-

To add more details to the issue:
 * When running multiple workers
 * AND the Connector name contains a non-URL compatible character
 * AND a follower worker has the Connector in its assignment
 * Then the follower->leader request sent over Connect REST fails in RestClient 
(without any error logging, or the corresponding future ever completed)

> No tasks created for a connector
> 
>
> Key: KAFKA-9747
> URL: https://issues.apache.org/jira/browse/KAFKA-9747
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04 LTS
> Platform: Confluent Platform 5.4
> HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge
>Reporter: Vit Koma
>Assignee: Andras Katona
>Priority: Major
> Attachments: connect-distributed.properties, connect.log
>
>
> We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
> (MongoDB) and Confluent S3 connectors. When adding a new connector via the 
> REST API the connector is created in RUNNING state, but no tasks are created 
> for the connector.
> Pausing and resuming the connector does not help. When we stop all workers 
> and then start them again, the tasks are created and everything runs as it 
> should.
> The issue does not show up if we run only a single node.
> The issue is not caused by the connector plugins, because we see the same 
> behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
> that Debezium is correctly returning a task configuration from the 
> Connector.taskConfigs() method.
> Connector configuration examples
> Debezium:
> {code}
> {
>   "name": "qa-mongodb-comp-converter-task|1",
>   "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": 
> "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
> "mongodb.name": "qa-debezium-comp",
> "mongodb.ssl.enabled": true,
> "collection.whitelist": "converter[.]task",
> "tombstones.on.delete": true
>   }
> }
> {code}
> S3 Connector:
> {code}
> {
>   "name": "qa-s3-sink-task|1",
>   "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "topics": "qa-debezium-comp.converter.task",
> "topics.dir": "data/env/qa",
> "s3.region": "eu-west-1",
> "s3.bucket.name": "",
> "flush.size": "15000",
> "rotate.interval.ms": "360",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": 
> "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
> "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.DefaultPartitioner",
> "schema.compatibility": "NONE",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enable": false,
> "value.converter.schemas.enable": false,
> "transforms": "ExtractDocument",
> 
> "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
>   }
> }
> {code}
> The connectors are created using curl: {{curl -X POST -H "Content-Type: 
> application/json" --data @ http:/:10083/connectors}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393104#comment-17393104
 ] 

Daniel Urban commented on KAFKA-10719:
--

I believe that the core issue here is that the connectors will start up with 
the old config first, and then the new config gets applied by MM2.

If the old listener is not available, that will cause timeouts in the 
rebalance. Connectors have a default timeout which exceeds the default 
rebalance timeout. When the rebalance timeout happens, none of the MM2 nodes 
will be able to update the config after the startup.

Besides deleting the config topic, you can also try to increase the rebalance 
timeout of the Connect workers running inside MM2 (to something around 3 
minutes), which will allow the cluster to not hit a rebalance timeout, and then 
successfully apply the new configs.

> MirrorMaker2 fails to update its runtime configuration
> --
>
> Key: KAFKA-10719
> URL: https://issues.apache.org/jira/browse/KAFKA-10719
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Peter Sinoros-Szabo
>Priority: Major
>
> I was running successfully the MM2 cluster with the following configuration, 
> I simplified it a little: {code:java} clusters = main, backup 
> main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 
> backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 
> main->backup.enabled = true main->backup.topics = .*{code} I wanted to change 
> the bootstrap.address list of the destination cluster to a different list 
> that is pointing to the *same* cluster, just a different listener with a 
> different routing. So I changed it to: {code:java} backup.bootstrap.servers = 
> backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart 
> on the MM2 nodes and say that some tasks were still using the old bootstrap 
> addresses, some of them were using the new one. I don't have the logs, so 
> unfortunately I don't know which one picked up the good values and which 
> didn't. I even stopped the cluster completely, but it didn't help. Ryanne 
> adviced to delete the mm2-config and mm2-status topics, so I did delete those 
> on the destination cluster, that solved this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393101#comment-17393101
 ] 

Daniel Urban commented on KAFKA-9981:
-

This KIP aims to fix the issue by adding the REST API to MM2, and also 
improving the config provider reference handling in the MM2 configs: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters]

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9805) Running MirrorMaker in a Connect cluster,but the task not running

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393080#comment-17393080
 ] 

Daniel Urban commented on KAFKA-9805:
-

I believe this is the same issue as KAFKA-9747, see the answer from [~akatona] 
about the issue.

> Running MirrorMaker in a Connect cluster,but the task not running
> -
>
> Key: KAFKA-9805
> URL: https://issues.apache.org/jira/browse/KAFKA-9805
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1
> Environment: linux
>Reporter: ZHAO GH
>Priority: Major
>   Original Estimate: 5h
>  Remaining Estimate: 5h
>
> when i am Running MirrorMaker in a Connect clusterwhen i am Running 
> MirrorMaker in a Connect cluster,sometime the task running,but sometime the 
> task cannot assignment。
> I post connector config to connect cluster,here is my config
> http://99.12.98.33:8083/connectorshttp://99.12.98.33:8083/connectors
> {    "name": "kafka->kafka241-3",   
> "config": {       
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",     
>     "topics": "MM2-3",     
>     "key.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",                     
>   "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",     
> "tasks.max": 8,     
> "sasl.mechanism": "PLAIN",   
>  "security.protocol": "SASL_PLAINTEXT",   
>  "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule 
> required username=\"admin\" password=\"admin\";",          
> "source.cluster.alias": "kafka",     "source.cluster.bootstrap.servers": 
> "55.13.104.70:9092,55.13.104.74:9092,55.13.104.126:9092",     
> "source.admin.sasl.mechanism": "PLAIN",     
> "source.admin.security.protocol": "SASL_PLAINTEXT",     
> "source.admin.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",         
> "target.cluster.alias": "kafka241",   
>  "target.cluster.bootstrap.servers": 
> "55.14.111.22:9092,55.14.111.23:9092,55.14.111.25:9092",     
> "target.admin.sasl.mechanism": "PLAIN",     
> "target.admin.security.protocol": "SASL_PLAINTEXT",   
> "target.admin.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",       
>  "producer.sasl.mechanism": "PLAIN",   
>  "producer.security.protocol": "SASL_PLAINTEXT",     
> "producer.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",          
> "consumer.sasl.mechanism": "PLAIN",   
>  "consumer.security.protocol": "SASL_PLAINTEXT",   
>  "consumer.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",     
> "consumer.group.id": "mm2-1"       
> }
> }
>  
> but I get the connector status,found not tasks running
> http://99.12.98.33:8083/connectors/kafka->kafka241-3/status
> {
>  "name": "kafka->kafka241-3",
>  "connector": {
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  "tasks": [],
>  "type": "source"
> }
>  
> but sometime,the task run success
> http://99.12.98.33:8083/connectors/kafka->kafka241-1/status
> {
>  "name": "kafka->kafka241-1",
>  "connector": {
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  "tasks": [
>  {
>  "id": 0,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  {
>  "id": 1,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.33:8083"
>  },
>  {
>  "id": 2,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  }
>  ],
>  "type": "source"
> }
> is somebody met this problem? how to fix it,is it a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12966) MM2 doesn't trigger replicate data on new topics

2021-06-17 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364996#comment-17364996
 ] 

Daniel Urban commented on KAFKA-12966:
--

I believe you are hitting the issue described in 
[KIP-710|https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters],
 MM2 doesn't include a Connect REST Server, making the follower->leader 
communication impossible.

> MM2 doesn't trigger replicate data on new topics
> 
>
> Key: KAFKA-12966
> URL: https://issues.apache.org/jira/browse/KAFKA-12966
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy
>Priority: Major
>
> After starting MM2, all topics was replicated, but when I create new topics, 
> it seems MM2 doesn't trigger to replicate data to these topics, although it 
> still create replica topic on target cluster. I have to restart all mm2 
> instances. This is not expected thing in an active cluster, when the new 
> topics are created every day. Is it a bug or a feature?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12893) MM2 fails to replicate if starting two+ nodes same time

2021-06-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357372#comment-17357372
 ] 

Daniel Urban commented on KAFKA-12893:
--

You might be hitting the issue of MM2 not running the Connect REST Server, thus 
not supporting follower->leader communication in the cluster.

See KIP-710 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)]
 for details.

The way to identify the situation is to check whether the MirrorSourceConnector 
is hosted on the leader or the follower node. If the leader is hosting it, 
replication should be fine, if the follower, you probably never get to a point 
where topics are actually picked up.

> MM2 fails to replicate if starting two+ nodes same time
> ---
>
> Key: KAFKA-12893
> URL: https://issues.apache.org/jira/browse/KAFKA-12893
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Tommi Vainikainen
>Priority: Major
>
> I've observed a situation where starting more than one MM2 node in parallel, 
> MM2 fails to start replication ie. replication flow seems to be stuck without 
> action. I used exactly same mm2.properties file to start only one at a time, 
> and the replication flow was proceeding smoothly.
> In my setup dc1 has topic "mytopic1" and there is a producer with approx 1 
> msg/sec, and I'm trying to repilcate this to dc2. What I observed is that 
> dc1.mytopic1 is created when initially launching two paraller MM2 instances, 
> but no messages gets written into the topic as I would expect. If I kill MM2 
> instances, and only start one MM2 node, then MM2 starts replicating the 
> messages in mytopic1.
> My mm2.properties:
> clusters=dc2, dc1
> dc1->dc2.emit.heartbeats.enabled=true
> dc1->dc2.enabled=true
> dc1->dc2.sync.group.offsets.enabled=false
> dc1->dc2.sync.group.offsets.interval.seconds=45
> dc1->dc2.topics=mytopic1
> dc1->dc2.topics.exclude=
> dc1.bootstrap.servers=tvainika-dc1-dev-sandbox.aivencloud.com:12693
> dc1.security.protocol=SSL
> dc1.ssl.keystore.type=PKCS12
> dc1.ssl.keystore.location=dc1/client.keystore.p12
> dc1.ssl.keystore.password=secret
> dc1.ssl.key.password=secret
> dc1.ssl.truststore.location=dc1/client.truststore.jks
> dc1.ssl.truststore.password=secret
> dc2.bootstrap.servers=tvainika-dc2-dev-sandbox.aivencloud.com:12693
> dc2.security.protocol=SSL
> dc2.ssl.keystore.type=PKCS12
> dc2.ssl.keystore.location=dc2/client.keystore.p12
> dc2.ssl.keystore.password=secret
> dc2.ssl.key.password=secret
> dc2.ssl.truststore.location=dc2/client.truststore.jks
> dc2.ssl.truststore.password=secret
> tasks.max=3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-14 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-12664.
--
Resolution: Not A Bug

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Assignee: Daniel Urban
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time I notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
>  [^connect.log.tar.gz]
> excerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
> [^mm2.properties]
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
>  (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> {code

[jira] [Assigned] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-14 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-12664:


Assignee: Daniel Urban

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Assignee: Daniel Urban
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time I notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
>  [^connect.log.tar.gz]
> excerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
> [^mm2.properties]
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
>  (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
>

[jira] [Commented] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-14 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321057#comment-17321057
 ] 

Daniel Urban commented on KAFKA-12664:
--

Issue is that in your setup, there are 2 connect groups coordinating through 
dc3, and they need separate group ids. When you specify *dc3.*, it affects all 
groups coordinating through dc3.

I'd say it might be a documentation issue.

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time I notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
>  [^connect.log.tar.gz]
> excerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
> [^mm2.properties]
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
>  (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.fact

[jira] [Commented] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-14 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320917#comment-17320917
 ] 

Daniel Urban commented on KAFKA-12664:
--

In short, just remove those group.id configs, and let the workers join the 
group generated by MM2, should work.

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time I notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
>  [^connect.log.tar.gz]
> excerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
> [^mm2.properties]
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
>  (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1

[jira] [Commented] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-14 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320763#comment-17320763
 ] 

Daniel Urban commented on KAFKA-12664:
--

MM2 spins up a Connect worker *per replication flow*. (The number of Connect 
workers are affected by the target clusters you provided for MM2 - if you did 
not do it explicitly, all clusters are targeted).

This setting:

dc3.group.id=mm2-dc3

Causes the Connect worker responsible for the dc1->dc3, and the worker 
responsible for dc2->dc3 join the same connect group. This causes a collision, 
likely to lead to those rebalances.

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time I notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
>  [^connect.log.tar.gz]
> excerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
> [^mm2.properties]
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
>  (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka

[jira] [Updated] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters

2021-01-25 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-10586:
-
Description: 
KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that 
with specific workloads, the dedicated MM2 cluster can become unable to react 
to dynamic topic and group filter changes.

(This occurs when after a rebalance operation, the leader node has no 
MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
stopped on the leader, meaning it cannot detect config changes by itself. 
Followers still running the connector can detect config changes, but they 
cannot query the leader for config updates.)

Besides the REST support, config provider references should be evaluated lazily 
in connector configurations.

  was:
KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that 
with specific workloads, the dedicated MM2 cluster can become unable to react 
to dynamic topic and group filter changes.

(This occurs when after a rebalance operation, the leader node has no 
MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
stopped on the leader, meaning it cannot detect config changes by itself. 
Followers still running the connector can detect config changes, but they 
cannot query the leader for config updates.)


> Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
> ---
>
> Key: KAFKA-10586
> URL: https://issues.apache.org/jira/browse/KAFKA-10586
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
> MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means 
> that with specific workloads, the dedicated MM2 cluster can become unable to 
> react to dynamic topic and group filter changes.
> (This occurs when after a rebalance operation, the leader node has no 
> MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
> stopped on the leader, meaning it cannot detect config changes by itself. 
> Followers still running the connector can detect config changes, but they 
> cannot query the leader for config updates.)
> Besides the REST support, config provider references should be evaluated 
> lazily in connector configurations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters

2021-01-25 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-10586:
-
Summary: Full support for distributed mode in dedicated MirrorMaker 2.0 
clusters  (was: MirrorMaker 2.0 REST support)

> Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
> ---
>
> Key: KAFKA-10586
> URL: https://issues.apache.org/jira/browse/KAFKA-10586
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
> MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means 
> that with specific workloads, the dedicated MM2 cluster can become unable to 
> react to dynamic topic and group filter changes.
> (This occurs when after a rebalance operation, the leader node has no 
> MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
> stopped on the leader, meaning it cannot detect config changes by itself. 
> Followers still running the connector can detect config changes, but they 
> cannot query the leader for config updates.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10586) MirrorMaker 2.0 REST support

2020-10-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-10586:


 Summary: MirrorMaker 2.0 REST support
 Key: KAFKA-10586
 URL: https://issues.apache.org/jira/browse/KAFKA-10586
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban
Assignee: Daniel Urban


KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that 
with specific workloads, the dedicated MM2 cluster can become unable to react 
to dynamic topic and group filter changes.

(This occurs when after a rebalance operation, the leader node has no 
MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
stopped on the leader, meaning it cannot detect config changes by itself. 
Followers still running the connector can detect config changes, but they 
cannot query the leader for config updates.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-24 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-10414.
--
Resolution: Not A Problem

api-util is only a test dependency, not an issue.

> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs.
> -Can be fixed by upgrading to at least version 2.0.0.AM25-
> Since api-all is also a dependency, and there is a class collision between 
> api-all and newer version of api-util, it is better to just upgrade api-util 
> to 1.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-19 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-10414:
-
Description: 
There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs.

-Can be fixed by upgrading to at least version 2.0.0.AM25-

Since api-all is also a dependency, and there is a class collision between 
api-all and newer version of api-util, it is better to just upgrade api-util to 
1.0.2

  was:
There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs. Can be fixed by 
upgrading to at least version 2.0.0.AM25


> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs.
> -Can be fixed by upgrading to at least version 2.0.0.AM25-
> Since api-all is also a dependency, and there is a class collision between 
> api-all and newer version of api-util, it is better to just upgrade api-util 
> to 1.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-18 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-10414:


Assignee: Daniel Urban

> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs. Can be fixed by 
> upgrading to at least version 2.0.0.AM25



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-18 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-10414:


 Summary: Upgrade api-util dependency - CVE-2018-1337
 Key: KAFKA-10414
 URL: https://issues.apache.org/jira/browse/KAFKA-10414
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban


There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs. Can be fixed by 
upgrading to at least version 2.0.0.AM25



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-5235) GetOffsetShell: retrieve offsets for multiple topics and partitions

2020-06-30 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-5235:

Summary: GetOffsetShell: retrieve offsets for multiple topics and 
partitions  (was: GetOffsetShell: retrieve offsets for all given topics and 
partitions with single request to the broker)

> GetOffsetShell: retrieve offsets for multiple topics and partitions
> ---
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Assignee: Daniel Urban
>Priority: Major
>  Labels: kip, tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, GetOffsetShell only allows fetching the offsets of a single topic 
> with an optional list of which partitions to describe. Besides that, it does 
> not allow consumer properties to be overridden. The tool does not have a 
> dedicated script under bin either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-5235) GetOffsetShell: support for multiple topics and consumer configuration override

2020-06-30 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-5235:

Summary: GetOffsetShell: support for multiple topics and consumer 
configuration override  (was: GetOffsetShell: retrieve offsets for multiple 
topics and partitions)

> GetOffsetShell: support for multiple topics and consumer configuration 
> override
> ---
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Assignee: Daniel Urban
>Priority: Major
>  Labels: kip, tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, GetOffsetShell only allows fetching the offsets of a single topic 
> with an optional list of which partitions to describe. Besides that, it does 
> not allow consumer properties to be overridden. The tool does not have a 
> dedicated script under bin either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2020-06-29 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-5235:

Description: Currently, GetOffsetShell only allows fetching the offsets of 
a single topic with an optional list of which partitions to describe. Besides 
that, it does not allow consumer properties to be overridden. The tool does not 
have a dedicated script under bin either.  (was: GetOffsetShell is implemented 
on old SimpleConsumer. It needs Zookeeper to retrieve metadata about topics and 
partitions. At present, GetOffsetShell does the following:
- get metadata from Zookeeper
- iterate over partitions
- for each partition, connect to its leader broker and request offsets
Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by means 
of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One request 
is sufficient for all topics and partitions.
As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it will 
not depend on obsolete API: SimpleConsumer, old producer API.)

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Assignee: Daniel Urban
>Priority: Major
>  Labels: kip, tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, GetOffsetShell only allows fetching the offsets of a single topic 
> with an optional list of which partitions to describe. Besides that, it does 
> not allow consumer properties to be overridden. The tool does not have a 
> dedicated script under bin either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2020-06-29 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-5235:
---

Assignee: Daniel Urban

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Assignee: Daniel Urban
>Priority: Major
>  Labels: kip, tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to 
> retrieve metadata about topics and partitions. At present, GetOffsetShell 
> does the following:
> - get metadata from Zookeeper
> - iterate over partitions
> - for each partition, connect to its leader broker and request offsets
> Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by 
> means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One 
> request is sufficient for all topics and partitions.
> As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it 
> will not depend on obsolete API: SimpleConsumer, old producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)