[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic
[ https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893838#comment-17893838 ] Daniel Urban commented on KAFKA-17719: -- I'm still thinking about a possible solution, but have a few points to consider: # In the current state, there is no way to differentiate between the "task resurrection" scenario described in KAFKA-16838 and the connect config compaction described in this ticket. When we find task configs + task commit records without a prior Connector configuration, we cannot know which scenario is present. So for existing clusters, we need to make a choice - do we want to avoid the resurrection, or do we want to avoid the NPE? # For future versions, I think some kind of version/ID should be generated and added to the Connector, the corresponding tasks and task commits. This could be generated when the Connector is created, and then kept on each update. When a Connector is deleted and then re-created, a new ID would be generated, helping in catching the zombie tasks. > Connect may fail to start tasks when reading from a compacted config topic > -- > > Key: KAFKA-17719 > URL: https://issues.apache.org/jira/browse/KAFKA-17719 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chaitanya Mukka >Assignee: Daniel Urban >Priority: Major > > The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) > alters the logic for materializing a view of the config topic to ignore task > configs when there is no configuration for that connector present earlier in > the config topic. However, the logic fails to consider topics that might get > compacted over time. > In particular, when we have a connector {{C1}} running fine, the records in > the config topic for the connector will look something like {{{}C1, T1, T2, > Task-commit-record{}}}. > If the connector gets a config update that doesn't produce any new task > configs (note that this is a valid case when there are no task config > changes[1]) we only produce a Connector config record [2]. The config topic > now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the > topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, > C1{}}}. This can be a common scenario in large and old connect clusters. > Based on the changes for KAFKA-16838, when the connect worker reads this > config state it ignores the task configs [3] for this while the connector is > still active and we might have active assignments for the same. The symptom > of this issue is an NPE which shows up when trying to start the tasks: > {noformat} > java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because > "inputMap" is null > at org.apache.kafka.common.utils.Utils.castToStringObjectMap > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.connect.runtime.TaskConfig. > at org.apache.kafka.connect.runtime.Worker.startTask) > at org.apache.kafka.connect.runtime.Worker.startSourceTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41 > at java.base/java.util.concurrent.FutureTask.run > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run > at java.base/java.lang.Thread.run(Thread.java:1583){noformat} > > [1] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047] > > [2] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524] > [3] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic
[ https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893768#comment-17893768 ] Daniel Urban commented on KAFKA-17719: -- [~cmukka20] I was thinking that allowing "deferred" task commits should help - if we encounter a task commit without a related connector, we just wait for the connector config to appear. To avoid introducing a regression (KAFKA-16838), connector creation (putConnectorConfig) could be updated to explicitly clear zombie tasks with a delete marker. > Connect may fail to start tasks when reading from a compacted config topic > -- > > Key: KAFKA-17719 > URL: https://issues.apache.org/jira/browse/KAFKA-17719 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chaitanya Mukka >Assignee: Daniel Urban >Priority: Major > > The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) > alters the logic for materializing a view of the config topic to ignore task > configs when there is no configuration for that connector present earlier in > the config topic. However, the logic fails to consider topics that might get > compacted over time. > In particular, when we have a connector {{C1}} running fine, the records in > the config topic for the connector will look something like {{{}C1, T1, T2, > Task-commit-record{}}}. > If the connector gets a config update that doesn't produce any new task > configs (note that this is a valid case when there are no task config > changes[1]) we only produce a Connector config record [2]. The config topic > now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the > topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, > C1{}}}. This can be a common scenario in large and old connect clusters. > Based on the changes for KAFKA-16838, when the connect worker reads this > config state it ignores the task configs [3] for this while the connector is > still active and we might have active assignments for the same. The symptom > of this issue is an NPE which shows up when trying to start the tasks: > {noformat} > java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because > "inputMap" is null > at org.apache.kafka.common.utils.Utils.castToStringObjectMap > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.connect.runtime.TaskConfig. > at org.apache.kafka.connect.runtime.Worker.startTask) > at org.apache.kafka.connect.runtime.Worker.startSourceTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41 > at java.base/java.util.concurrent.FutureTask.run > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run > at java.base/java.lang.Thread.run(Thread.java:1583){noformat} > > [1] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047] > > [2] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524] > [3] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic
[ https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893765#comment-17893765 ] Daniel Urban commented on KAFKA-17719: -- I see that the fix for KAFKA-16838 was explicitly trying to solve the issue of tasks getting "resurrected" when a connector is created with the same name. Because of this, "removing" that edge case is not a proper solution, will dig deeper and try to figure out a solution. > Connect may fail to start tasks when reading from a compacted config topic > -- > > Key: KAFKA-17719 > URL: https://issues.apache.org/jira/browse/KAFKA-17719 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chaitanya Mukka >Assignee: Daniel Urban >Priority: Major > > The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) > alters the logic for materializing a view of the config topic to ignore task > configs when there is no configuration for that connector present earlier in > the config topic. However, the logic fails to consider topics that might get > compacted over time. > In particular, when we have a connector {{C1}} running fine, the records in > the config topic for the connector will look something like {{{}C1, T1, T2, > Task-commit-record{}}}. > If the connector gets a config update that doesn't produce any new task > configs (note that this is a valid case when there are no task config > changes[1]) we only produce a Connector config record [2]. The config topic > now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the > topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, > C1{}}}. This can be a common scenario in large and old connect clusters. > Based on the changes for KAFKA-16838, when the connect worker reads this > config state it ignores the task configs [3] for this while the connector is > still active and we might have active assignments for the same. The symptom > of this issue is an NPE which shows up when trying to start the tasks: > {noformat} > java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because > "inputMap" is null > at org.apache.kafka.common.utils.Utils.castToStringObjectMap > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.connect.runtime.TaskConfig. > at org.apache.kafka.connect.runtime.Worker.startTask) > at org.apache.kafka.connect.runtime.Worker.startSourceTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41 > at java.base/java.util.concurrent.FutureTask.run > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run > at java.base/java.lang.Thread.run(Thread.java:1583){noformat} > > [1] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047] > > [2] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524] > [3] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic
[ https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893727#comment-17893727 ] Daniel Urban commented on KAFKA-17719: -- [~cmukka20] I think your analysis is correct, will try to submit a fix. I don't see a clean way of removing these dangling task configs from memory, so I'll just remove the edge case from the logic. > Connect may fail to start tasks when reading from a compacted config topic > -- > > Key: KAFKA-17719 > URL: https://issues.apache.org/jira/browse/KAFKA-17719 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chaitanya Mukka >Priority: Major > > The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) > alters the logic for materializing a view of the config topic to ignore task > configs when there is no configuration for that connector present earlier in > the config topic. However, the logic fails to consider topics that might get > compacted over time. > In particular, when we have a connector {{C1}} running fine, the records in > the config topic for the connector will look something like {{{}C1, T1, T2, > Task-commit-record{}}}. > If the connector gets a config update that doesn't produce any new task > configs (note that this is a valid case when there are no task config > changes[1]) we only produce a Connector config record [2]. The config topic > now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the > topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, > C1{}}}. This can be a common scenario in large and old connect clusters. > Based on the changes for KAFKA-16838, when the connect worker reads this > config state it ignores the task configs [3] for this while the connector is > still active and we might have active assignments for the same. The symptom > of this issue is an NPE which shows up when trying to start the tasks: > {noformat} > java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because > "inputMap" is null > at org.apache.kafka.common.utils.Utils.castToStringObjectMap > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.connect.runtime.TaskConfig. > at org.apache.kafka.connect.runtime.Worker.startTask) > at org.apache.kafka.connect.runtime.Worker.startSourceTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41 > at java.base/java.util.concurrent.FutureTask.run > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run > at java.base/java.lang.Thread.run(Thread.java:1583){noformat} > > [1] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047] > > [2] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524] > [3] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17719) Connect may fail to start tasks when reading from a compacted config topic
[ https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-17719: Assignee: Daniel Urban > Connect may fail to start tasks when reading from a compacted config topic > -- > > Key: KAFKA-17719 > URL: https://issues.apache.org/jira/browse/KAFKA-17719 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chaitanya Mukka >Assignee: Daniel Urban >Priority: Major > > The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) > alters the logic for materializing a view of the config topic to ignore task > configs when there is no configuration for that connector present earlier in > the config topic. However, the logic fails to consider topics that might get > compacted over time. > In particular, when we have a connector {{C1}} running fine, the records in > the config topic for the connector will look something like {{{}C1, T1, T2, > Task-commit-record{}}}. > If the connector gets a config update that doesn't produce any new task > configs (note that this is a valid case when there are no task config > changes[1]) we only produce a Connector config record [2]. The config topic > now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the > topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, > C1{}}}. This can be a common scenario in large and old connect clusters. > Based on the changes for KAFKA-16838, when the connect worker reads this > config state it ignores the task configs [3] for this while the connector is > still active and we might have active assignments for the same. The symptom > of this issue is an NPE which shows up when trying to start the tasks: > {noformat} > java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because > "inputMap" is null > at org.apache.kafka.common.utils.Utils.castToStringObjectMap > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.common.config.AbstractConfig. > at org.apache.kafka.connect.runtime.TaskConfig. > at org.apache.kafka.connect.runtime.Worker.startTask) > at org.apache.kafka.connect.runtime.Worker.startSourceTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41 > at java.base/java.util.concurrent.FutureTask.run > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run > at java.base/java.lang.Thread.run(Thread.java:1583){noformat} > > [1] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047] > > [2] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524] > [3] - > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17632) Custom `partitioner.class` with an even number of partitions always writes to even partitions if use RoundRobinPartitioner
[ https://issues.apache.org/jira/browse/KAFKA-17632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-17632: Assignee: Daniel Urban > Custom `partitioner.class` with an even number of partitions always writes to > even partitions if use RoundRobinPartitioner > -- > > Key: KAFKA-17632 > URL: https://issues.apache.org/jira/browse/KAFKA-17632 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: thanhlv >Assignee: Daniel Urban >Priority: Minor > Attachments: image-2024-09-27-15-05-53-707.png > > > Our project has some special logic that requires custom partitions. > With an odd number of Partitions, everything works fine and well. > However, with an even number of partitions. Data will only be written to > even-numbered Partition IDs > Info: > Lib Java: `kafka-clients:3.8.0` > Code demo: > {code:java} > public class CustomLogicPartitionMain { > public static void main(String[] args) throws IOException { > System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "ALL"); > final var props = new Properties(); > props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, > "java-producer-producerRecordPartition-KeyNotNull"); > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:29091,localhost:29092,localhost:29093,localhost:29094"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "5000"); > props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, > "org.apache.kafka.clients.producer.RoundRobinPartitioner"); > try (var producer = new KafkaProducer(props)) { > final var messageProducerRecord = new ProducerRecord<>( > "topic-rep-1-partition-10", //topic name > // 36 byte > UUID.randomUUID().toString()// value > ); > for (int i = 1; i <= 5000; i++) { > producer.send(messageProducerRecord); > } > } > } > } > {code} > !image-2024-09-27-15-05-53-707.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17632) Custom `partitioner.class` with an even number of partitions always writes to even partitions if use RoundRobinPartitioner
[ https://issues.apache.org/jira/browse/KAFKA-17632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893559#comment-17893559 ] Daniel Urban commented on KAFKA-17632: -- Thank you for the report and the repro code. I'm unsure if this ever worked correctly, but in the current code, KafkaProducer calls partitioner.partition twice on new batches, this is the location of the 2nd call: [https://github.com/apache/kafka/blob/14a9130f6fa31c10cb65cc500c101148d0410306/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1079] Because of this, when you have an even number of partitions, you are basically "locked out" of half of your partitions when using RR. RoundRobinPartitioner was implemented without taking the semantics of the onNewBatch method into account. I'll try to submit a fix for this - basically the onNewBatch can be handled as a "do not increment on the next call of partition" notification. I think the only way to fix your custom partitioner implementation is to introduce similar logic in the onNewBatch callback. Since onNewBatch is deprecated, it will be removed in the future, and that will probably fix the logic in KafkaProducer. Due to backward compatibility, and other partitioner implementations relying on this double-call, I think we cannot really change the producer logic. > Custom `partitioner.class` with an even number of partitions always writes to > even partitions if use RoundRobinPartitioner > -- > > Key: KAFKA-17632 > URL: https://issues.apache.org/jira/browse/KAFKA-17632 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: thanhlv >Assignee: Daniel Urban >Priority: Minor > Attachments: image-2024-09-27-15-05-53-707.png > > > Our project has some special logic that requires custom partitions. > With an odd number of Partitions, everything works fine and well. > However, with an even number of partitions. Data will only be written to > even-numbered Partition IDs > Info: > Lib Java: `kafka-clients:3.8.0` > Code demo: > {code:java} > public class CustomLogicPartitionMain { > public static void main(String[] args) throws IOException { > System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "ALL"); > final var props = new Properties(); > props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, > "java-producer-producerRecordPartition-KeyNotNull"); > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:29091,localhost:29092,localhost:29093,localhost:29094"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "5000"); > props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, > "org.apache.kafka.clients.producer.RoundRobinPartitioner"); > try (var producer = new KafkaProducer(props)) { > final var messageProducerRecord = new ProducerRecord<>( > "topic-rep-1-partition-10", //topic name > // 36 byte > UUID.randomUUID().toString()// value > ); > for (int i = 1; i <= 5000; i++) { > producer.send(messageProducerRecord); > } > } > } > } > {code} > !image-2024-09-27-15-05-53-707.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17828) Reverse Checkpointing in MirrorMaker2
Daniel Urban created KAFKA-17828: Summary: Reverse Checkpointing in MirrorMaker2 Key: KAFKA-17828 URL: https://issues.apache.org/jira/browse/KAFKA-17828 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Assignee: Daniel Urban MM2 supports checkpointing - replicating the committed offsets of groups across Kafka clusters. This is a one-way replication, meaning that consumers can rely on this when they fail over from the upstream cluster to the downstream cluster. But checkpointing would be desirable in failbacks, too: if the consumers processed messages from the downstream topic, they do not want to consume the same messages again in the upstream topic. To avoid this, checkpointing should also support reverse checkpointing in the context of a bidirectional replication: creating checkpoints between downstream->upstream topics. This ticket implements KIP-1098. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17534) Allow disabling hearbeats topic replication in MirrorSourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-17534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-17534: - Description: Currently, MirrorSourceConnector always replicates the heartbeats topics. In some use-cases (e.g. multiple parallel, mutually exclusive MirrorSourceConnector instances in the same replication), users need to disable this behavior. Add a new property to allow disabling the heartbeats replication. This ticket implements KIP-1089. was: Currently, MirrorSourceConnector always replicates the heartbeats topics. In some use-cases (e.g. multiple parallel, mutually exclusive MirrorSourceConnector instances in the same replication), users need to disable this behavior. Add a new property to allow disabling the heartbeats replication. > Allow disabling hearbeats topic replication in MirrorSourceConnector > > > Key: KAFKA-17534 > URL: https://issues.apache.org/jira/browse/KAFKA-17534 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > Currently, MirrorSourceConnector always replicates the heartbeats topics. In > some use-cases (e.g. multiple parallel, mutually exclusive > MirrorSourceConnector instances in the same replication), users need to > disable this behavior. > Add a new property to allow disabling the heartbeats replication. > This ticket implements KIP-1089. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17534) Allow disabling hearbeats topic replication in MirrorSourceConnector
Daniel Urban created KAFKA-17534: Summary: Allow disabling hearbeats topic replication in MirrorSourceConnector Key: KAFKA-17534 URL: https://issues.apache.org/jira/browse/KAFKA-17534 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Assignee: Daniel Urban Currently, MirrorSourceConnector always replicates the heartbeats topics. In some use-cases (e.g. multiple parallel, mutually exclusive MirrorSourceConnector instances in the same replication), users need to disable this behavior. Add a new property to allow disabling the heartbeats replication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13459) MM2 should be able to add the source offset to the record header
[ https://issues.apache.org/jira/browse/KAFKA-13459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839981#comment-17839981 ] Daniel Urban commented on KAFKA-13459: -- [~viktorsomogyi] No plans, and it does require a KIP. > MM2 should be able to add the source offset to the record header > > > Key: KAFKA-13459 > URL: https://issues.apache.org/jira/browse/KAFKA-13459 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Viktor Somogyi-Vass >Priority: Major > > MM2 could add the source offset to the record header to help with diagnostics > in some use-cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755997#comment-17755997 ] Daniel Urban edited comment on KAFKA-15372 at 8/18/23 2:33 PM: --- [~gharris1727] I have a bit more information: we managed to deterministically reproduce the issue if we use a 2 instance cluster, and make a change in the config, then wait a long time between the instance restarts. In our specific test we wait 3 minutes after stopping the first MM2 instance, then wait 3 minutes after starting the first MM2 instance, then wait 3 minutes after stopping the second MM2 instance, and so on. At the end of this process, the config change does not get applied. I think this all depends on the rebalance - if the rebalance finishes before the MM2 instance bounces back, the leadership will always move to the other node. Again, all of this is tested on a 3.4.1 build which has the MM2 internal REST enabled, but I'm pretty sure that trunk is affected. was (Author: durban): [~gharris1727] I have a bit more information: we managed to deterministically reproduce the issue if we use a 2 instance cluster, and make a change in the config, then wait a long time between the instance restarts. In our specific test we wait 3 minutes after stopping the first MM2 instance, then wait 3 minutes after starting the first MM2 instance, then wait 3 minutes after stopping the second MM2 instance, and so on. At the end of this process, the config change does not get applied. I think this all depends on the rebalance - if the rebalance finishes before the MM2 instance bounces back, the leadership will always move to the other node. > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Priority: Major > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755997#comment-17755997 ] Daniel Urban commented on KAFKA-15372: -- [~gharris1727] I have a bit more information: we managed to deterministically reproduce the issue if we use a 2 instance cluster, and make a change in the config, then wait a long time between the instance restarts. In our specific test we wait 3 minutes after stopping the first MM2 instance, then wait 3 minutes after starting the first MM2 instance, then wait 3 minutes after stopping the second MM2 instance, and so on. At the end of this process, the config change does not get applied. I think this all depends on the rebalance - if the rebalance finishes before the MM2 instance bounces back, the leadership will always move to the other node. > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Priority: Major > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755833#comment-17755833 ] Daniel Urban commented on KAFKA-15372: -- [~gharris1727] Not sure if I follow this part: "should forward configurations to the leader via the internal REST API." I checked org.apache.kafka.connect.mirror.MirrorMaker#configureConnector which then calls org.apache.kafka.connect.runtime.distributed.DistributedHerder#putConnectorConfig, and I don't really see any sign of forwarding to the leader. The callback of the validation explicitly handles the non-leader state with a failure: {code:java} if (!isLeader()) { callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); return null; } {code} So I think that current trunk is also affected by this, there is no Connector configuration forwarding to the leader in MM2. Additionally, I'm not sure if a single forward attempt is enough to ensure correctness, but that is an implementation detail. Unfortunately, I really don't have an exact reproduction, but I saw this happening in an actual cluster, the leadership changes occurred as I detailed in the ticket description. > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Priority: Major > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755643#comment-17755643 ] Daniel Urban commented on KAFKA-15372: -- Hi [~gharris1727], I don't have a deterministic reproduction of the issue. The reproduction of the problem requires multiple MM2 instances, but the internal REST is not needed at all (the startup and Connector config update does not touch the REST). Encountered this on a 3.4 build which contains the MM2 internal REST feature. > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Priority: Major > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
Daniel Urban created KAFKA-15372: Summary: MM2 rolling restart can drop configuration changes silently Key: KAFKA-15372 URL: https://issues.apache.org/jira/browse/KAFKA-15372 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban When MM2 is restarted, it tries to update the Connector configuration in all flows. This is a one-time trial, and fails if the Connect worker is not the leader of the group. In a distributed setup and with a rolling restart, it is possible that for a specific flow, the Connect worker of the just restarted MM2 instance is not the leader, meaning that Connector configurations can get dropped. For example, assuming 2 MM2 instances, and one flow A->B: # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the leader of A->B Connect group. # MM2 instance 1 tries to update the Connector configurations, but fails (instance 2 has the leader, not instance 1) # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 # MM2 instance 2 tries to update the Connector configurations, but fails At this point, the configuration changes before the restart are never applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions
[ https://issues.apache.org/jira/browse/KAFKA-15075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17735562#comment-17735562 ] Daniel Urban commented on KAFKA-15075: -- [~elkkhan] Not in the near future, feel free to pick it up. > MM2 internal checkpoints topic should support multiple partitions > - > > Key: KAFKA-15075 > URL: https://issues.apache.org/jira/browse/KAFKA-15075 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Priority: Major > > Currently, the internal checkpoints topic of MM2 uses a single partition. > This is an unnecessary limitation, and instead, it should support more > partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions
Daniel Urban created KAFKA-15075: Summary: MM2 internal checkpoints topic should support multiple partitions Key: KAFKA-15075 URL: https://issues.apache.org/jira/browse/KAFKA-15075 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Currently, the internal checkpoints topic of MM2 uses a single partition. This is an unnecessary limitation, and instead, it should support more partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class
[ https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-13756: Assignee: Daniel Urban > Connect validate endpoint should return proper response for invalid connector > class > --- > > Key: KAFKA-13756 > URL: https://issues.apache.org/jira/browse/KAFKA-13756 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > Currently, if there is an issue with the connector class, the validate > endpoint returns a 400 or a 500 response. > Instead, it should return a well formatted response containing a proper > validation error message. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14497) LastStableOffset is advanced prematurely when a log is reopened.
[ https://issues.apache.org/jira/browse/KAFKA-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728723#comment-17728723 ] Daniel Urban commented on KAFKA-14497: -- AFAIU, the information about the replicated state of a transaction is not stored in the snapshot at all. I think the data stored in the snapshot file needs to be extended with the extra information whether the completed transaction is replicated. By the time ProducerStateManager#completeTxn is called (which puts the transaction into ProducerStateManager.unreplicatedTxns), the producer entry is already cleared (ProducerAppendInfo#appendEndTxnMarker - currentTxnFirstOffset is empty, indicating that there is no pending transaction). If a snapshot is created at this point, and then the snapshot is loaded, there is no way to differentiate between replicated and unreplicated transactions. Instead, ProducerAppendInfo#appendEndTxnMarker should also set a flag showing that while the transaction is complete, it might still be unreplicated. Then, when ProducerStateManager#removeUnreplicatedTransactions is called, the flag in the producer entry can be cleared. This way the snapshot would contain the full data, and we could also recover the state of unreplicatedTxns. [~hachikuji] wdyt about this approach? If it seems okay, I can take a look into this and submit a PR. > LastStableOffset is advanced prematurely when a log is reopened. > > > Key: KAFKA-14497 > URL: https://issues.apache.org/jira/browse/KAFKA-14497 > Project: Kafka > Issue Type: Bug >Reporter: Vincent Jiang >Priority: Major > > In below test case, last stable offset of log is advanced prematurely after > reopen: > # producer #1 appends transaction records to leader. offsets = [0, 1, 2, 3] > # producer #2 appends transactional records to leader. offsets = [4, 5, 6, > 7] > # all records are replicated to followers and high watermark advanced to 8. > # at this point, lastStableOffset = 0. (first offset of an open transaction) > # producer #1 aborts the transaction by writing an abort marker at offset 8. > ProducerStateManager.unreplicatedTxns contains the aborted transaction > (firstOffset=0, lastOffset=8) > # then the log is closed and reopened. > # after reopen, log.lastStableOffset is initialized to 4. This is because > ProducerStateManager.unreplicatedTxns is empty after reopening log. > > We should rebuild ProducerStateManager.unreplicatedTxns when reloading a log, > so that lastStableOffset remains unchanged before and after reopen. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14034) Consistency violation: enabled idempotency doesn't prevent duplicates when a client runs into UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-14034: Assignee: Daniel Urban > Consistency violation: enabled idempotency doesn't prevent duplicates when a > client runs into UNKNOWN_SERVER_ERROR > -- > > Key: KAFKA-14034 > URL: https://issues.apache.org/jira/browse/KAFKA-14034 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0, 3.1.1 >Reporter: Denis Rystsov >Assignee: Daniel Urban >Priority: Major > > Hey folks, I've observed duplicated records in the log while idempotency was > enabled and it looks like the kafka client is the culprit. I've tested on > 3.0.0 but the tip of the kafka repo is also affected > Let a user sends two produce requests without async so there is two inflight > requests > {code:java} > producer.send(A) > producer.send(B){code} > Let the first request results with a retry-able error after it was written to > disk and let the second request results with UNKNOWN_SERVER_ERROR. Any > unhandled exception on the broker side results in UNKNOWN_SERVER_ERROR so it > may happen. > Since request A is retry-able it is put into the outbound queue there - > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L623] > Let B's UNKNOWN_SERVER_ERROR is received before A is retried. It is being > processed in the following methods: > * > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642] > * > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L742] > * > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L761] > * > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L624] > * maybeTransitionToErrorState doesn't consider UNKNOWN_SERVER_ERROR fatal so > it doesn't mark the request as such: > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L611] > * as result handleFailedBatch requests epoch bump > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L652] > When epoch is bumped it rewrites sequence numbers of the inflight requests: > [https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L481] > In our case it rewrites A's sequence numbers and when the request is retried > the broker can't dedupe it and writes it to the log thus violating the > idempotency guarantees. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics
Daniel Urban created KAFKA-14978: Summary: ExactlyOnceWorkerSourceTask does not remove parent metrics Key: KAFKA-14978 URL: https://issues.apache.org/jira/browse/KAFKA-14978 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Daniel Urban Assignee: Daniel Urban ExactlyOnceWorkerSourceTask removeMetrics does not invoke super.removeMetrics, meaning that only the transactional metrics are removed, and common source task metrics are not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context (KIP-916)
[ https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14652: - Summary: Improve MM2 logging by adding the flow information to the context (KIP-916) (was: Improve MM2 logging by adding the flow information to the context) > Improve MM2 logging by adding the flow information to the context (KIP-916) > --- > > Key: KAFKA-14652 > URL: https://issues.apache.org/jira/browse/KAFKA-14652 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MirrorMaker2 runs multiple Connect worker instances in a single process. In > Connect, the logging is based on the assumption that Connector names are > unique. But in MM2, the same Connector names are being used in each flow > (Connect group). This means that there is no way to differentiate between the > logs of MirrorSourceConnector in A->B and in B->A. > This can be improved by adding the flow to the logging context and the names > of the Connect framework threads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14807) MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the pause of replication of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714097#comment-17714097 ] Daniel Urban commented on KAFKA-14807: -- [~fisher91] do you use MM2 dedicated mode, or use the Connectors directly in a Connect cluster? If the latter, you can fix this by only passing source.consumer.auto.offset.reset=latest to MirrorSourceConnector, but not to the MirrorCheckpointConnector. For MM2 dedicated mode, I'm not aware of any workarounds. > MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the > pause of replication of consumer groups > --- > > Key: KAFKA-14807 > URL: https://issues.apache.org/jira/browse/KAFKA-14807 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0, 3.3.1, 3.3.2 > Environment: centos7 >Reporter: Zhaoli >Priority: Major > > We use MirrorMaker2 to replicate messages and consumer group offsets from the > Kafka cluster `source` to cluster `target`. > To reduce the load on the source cluster, we add this configuration to mm2 to > avoid replicating the whole history messages: > {code:java} > source.consumer.auto.offset.reset=latest {code} > After that, we found part of the consumer group offsets had stopped > replicating. > The common characteristic of these consumer groups is their EMPTY status, > which means they have no active members at that moment. All the active > consumer groups‘ offset replication work as normal. > After researching the source code, we found this is because the configuration > above also affects the consumption of topic `mm2-offset-syncs`, therefore the > map `offsetSyncs` doesn't hold the whole topic partitions: > {code:java} > private final Map offsetSyncs = new HashMap<>(); > {code} > And the lost topicPartitions lead to the pause of replication of the EMPTY > consumer groups, which is not expected. > {code:java} > OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long > upstreamOffset) { > Optional offsetSync = latestOffsetSync(sourceTopicPartition); > if (offsetSync.isPresent()) { > if (offsetSync.get().upstreamOffset() > upstreamOffset) { > // Offset is too far in the past to translate accurately > return OptionalLong.of(-1L); > } > long upstreamStep = upstreamOffset - > offsetSync.get().upstreamOffset(); > return OptionalLong.of(offsetSync.get().downstreamOffset() + > upstreamStep); > } else { > return OptionalLong.empty(); > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14903) MM2 Topic And Group Listener (KIP-918)
Daniel Urban created KAFKA-14903: Summary: MM2 Topic And Group Listener (KIP-918) Key: KAFKA-14903 URL: https://issues.apache.org/jira/browse/KAFKA-14903 Project: Kafka Issue Type: New Feature Reporter: Daniel Urban Assignee: Daniel Urban MM2 has a dynamic topic and group filter mechanism, in which the replicated topics/groups can dynamically change, either due to changes in the available topics/groups, or changes in the filter settings. In order to monitor the currently replicated topics/groups, MM2 should support a TopicListener and GroupListener plugin, which is triggered when MM2 changes the set of replicated topics/groups. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14902) KafkaBasedLog infinite retries can lead to StackOverflowError
Daniel Urban created KAFKA-14902: Summary: KafkaBasedLog infinite retries can lead to StackOverflowError Key: KAFKA-14902 URL: https://issues.apache.org/jira/browse/KAFKA-14902 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Daniel Urban Assignee: Daniel Urban KafkaBasedLog subclasses use an infinite retry on producer sends, using a callback. Sometimes, when specific errors are encountered, the callback is invoked in the send call, on the calling thread. If this happens enough times, a stack overflow happens. Example stacktrace from 2.5 (but the newest code can also encounter the same): {code:java} 2023-01-14 12:48:23,487 ERROR org.apache.kafka.connect.runtime.WorkerTask: WorkerSourceTask{id=MirrorSourceConnector-1} Task threw an uncaught and unrecoverable exception java.lang.StackOverflowError: null at org.apache.kafka.common.metrics.stats.SampledStat.record(SampledStat.java:50) at org.apache.kafka.common.metrics.stats.Rate.record(Rate.java:60) at org.apache.kafka.common.metrics.stats.Meter.record(Meter.java:80) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:188) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:178) at org.apache.kafka.clients.producer.internals.BufferPool.recordWaitTime(BufferPool.java:202) at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:147) at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:221) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:941) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298) ... at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:959) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298){code} Note the repeated KafkaProducer.send -> KafkaProducer.doSend -> KafkaStatusBackingStore$4.onCompletion calls, causing the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context
[ https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14652: - Description: MirrorMaker2 runs multiple Connect worker instances in a single process. In Connect, the logging is based on the assumption that Connector names are unique. But in MM2, the same Connector names are being used in each flow (Connect group). This means that there is no way to differentiate between the logs of MirrorSourceConnector in A->B and in B->A. This can be improved by adding the flow to the logging context and the names of the Connect framework threads. was: MirrorMaker2 runs multiple Connect worker instances in a single process. In Connect, the logging is based on the assumption that Connector names are unique. But in MM2, the same Connector names are being used in each flow (Connect group). This means that there is no way to differentiate between the logs of MirrorSourceConnector in A->B and in B->A. This can be improved by adding the flow to the logging context. > Improve MM2 logging by adding the flow information to the context > - > > Key: KAFKA-14652 > URL: https://issues.apache.org/jira/browse/KAFKA-14652 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MirrorMaker2 runs multiple Connect worker instances in a single process. In > Connect, the logging is based on the assumption that Connector names are > unique. But in MM2, the same Connector names are being used in each flow > (Connect group). This means that there is no way to differentiate between the > logs of MirrorSourceConnector in A->B and in B->A. > This can be improved by adding the flow to the logging context and the names > of the Connect framework threads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role
[ https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14838: - Description: MM2 code creates a lot of Kafka clients internally. These clients generate a lot of logs, but since the client.id is not properly specified, connecting the dots between a specific Connector/Task and its internal client is close to impossible. This is even more complex when MM2 is running in distributed mode, in which multiple Connect workers are running inside the same process. For Connector/Task created clients, the client.id clients should specify the flow, the Connector name/Task ID and the role of the client. E.g. MirrorSourceConnector uses multiple admin clients, and their client.id should reflect the difference between them. For Worker created clients, the client.id should refer to the flow. This will help log analysis significantly, especially in MM2 mode. was: MM2 code creates a lot of Kafka clients internally. These clients generate a lot of logs, but since the client.id is not properly specified, connecting the dots between a specific Connector/Task and its internal client is close to impossible. The client.id of such clients should specify the Connector name/Task ID, and should also specify the role of the client. E.g. MirrorSourceConnector uses multiple admin clients, and their client.id should reflect the difference between them. This will help log analysis significantly, especially in MM2 mode. > MM2 Worker/Connector/Task clients should specify client ID based on flow and > role > - > > Key: KAFKA-14838 > URL: https://issues.apache.org/jira/browse/KAFKA-14838 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MM2 code creates a lot of Kafka clients internally. These clients generate a > lot of logs, but since the client.id is not properly specified, connecting > the dots between a specific Connector/Task and its internal client is close > to impossible. This is even more complex when MM2 is running in distributed > mode, in which multiple Connect workers are running inside the same process. > For Connector/Task created clients, the client.id clients should specify the > flow, the Connector name/Task ID and the role of the client. E.g. > MirrorSourceConnector uses multiple admin clients, and their client.id should > reflect the difference between them. > For Worker created clients, the client.id should refer to the flow. > This will help log analysis significantly, especially in MM2 mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14838) MM2 Worker/Connector/Task clients should specify client ID based on flow and role
[ https://issues.apache.org/jira/browse/KAFKA-14838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14838: - Summary: MM2 Worker/Connector/Task clients should specify client ID based on flow and role (was: MM2 Connector/Task clients should specify client ID based on ID and role) > MM2 Worker/Connector/Task clients should specify client ID based on flow and > role > - > > Key: KAFKA-14838 > URL: https://issues.apache.org/jira/browse/KAFKA-14838 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MM2 code creates a lot of Kafka clients internally. These clients generate a > lot of logs, but since the client.id is not properly specified, connecting > the dots between a specific Connector/Task and its internal client is close > to impossible. > The client.id of such clients should specify the Connector name/Task ID, and > should also specify the role of the client. E.g. MirrorSourceConnector uses > multiple admin clients, and their client.id should reflect the difference > between them. This will help log analysis significantly, especially in MM2 > mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context
[ https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14652: - Description: MirrorMaker2 runs multiple Connect worker instances in a single process. In Connect, the logging is based on the assumption that Connector names are unique. But in MM2, the same Connector names are being used in each flow (Connect group). This means that there is no way to differentiate between the logs of MirrorSourceConnector in A->B and in B->A. This can be improved by adding the flow to the logging context. was: MirrorMaker2 runs multiple Connect worker instances in a single process. In Connect, the logging is based on the assumption that Connector names are unique. But in MM2, the same Connector names are being used in each flow (Connect group). This means that there is no way to differentiate between the logs of MirrorSourceConnector in A->B and in B->A. This can be improved by adding the flow to the logging context. Additionally, the client.id of the Kafka clients used by the MM2 Connectors should also be set explicitly, with the flow information added. > Improve MM2 logging by adding the flow information to the context > - > > Key: KAFKA-14652 > URL: https://issues.apache.org/jira/browse/KAFKA-14652 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MirrorMaker2 runs multiple Connect worker instances in a single process. In > Connect, the logging is based on the assumption that Connector names are > unique. But in MM2, the same Connector names are being used in each flow > (Connect group). This means that there is no way to differentiate between the > logs of MirrorSourceConnector in A->B and in B->A. > This can be improved by adding the flow to the logging context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14838) MM2 Connector/Task clients should specify client ID based on ID and role
Daniel Urban created KAFKA-14838: Summary: MM2 Connector/Task clients should specify client ID based on ID and role Key: KAFKA-14838 URL: https://issues.apache.org/jira/browse/KAFKA-14838 Project: Kafka Issue Type: Improvement Reporter: Daniel Urban Assignee: Daniel Urban MM2 code creates a lot of Kafka clients internally. These clients generate a lot of logs, but since the client.id is not properly specified, connecting the dots between a specific Connector/Task and its internal client is close to impossible. The client.id of such clients should specify the Connector name/Task ID, and should also specify the role of the client. E.g. MirrorSourceConnector uses multiple admin clients, and their client.id should reflect the difference between them. This will help log analysis significantly, especially in MM2 mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context
[ https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-14652: Assignee: Daniel Urban > Improve MM2 logging by adding the flow information to the context > - > > Key: KAFKA-14652 > URL: https://issues.apache.org/jira/browse/KAFKA-14652 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MirrorMaker2 runs multiple Connect worker instances in a single process. In > Connect, the logging is based on the assumption that Connector names are > unique. But in MM2, the same Connector names are being used in each flow > (Connect group). This means that there is no way to differentiate between the > logs of MirrorSourceConnector in A->B and in B->A. > This can be improved by adding the flow to the logging context. > Additionally, the client.id of the Kafka clients used by the MM2 Connectors > should also be set explicitly, with the flow information added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14721) Kafka listener uses wrong login class
Daniel Urban created KAFKA-14721: Summary: Kafka listener uses wrong login class Key: KAFKA-14721 URL: https://issues.apache.org/jira/browse/KAFKA-14721 Project: Kafka Issue Type: Bug Affects Versions: 3.1.2 Reporter: Daniel Urban When trying to configure a single SASL_SSL listener with GSSAPI, Scram and OAuth support, we encounter an error at startup: {code:java} 2023-02-15 13:26:04,250 ERROR kafka.server.KafkaServer: [main]: [KafkaServer id=104] Fatal error during KafkaServer startup. Prepare to shutdown org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at kafka.network.Processor.(SocketServer.scala:861) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) ~[scala-library-2.13.10.jar:?] at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) ~[scala-library-2.13.10.jar:?] at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) ~[scala-library-2.13.10.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:933) ~[scala-library-2.13.10.jar:?] at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.startup(SocketServer.scala:131) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.server.KafkaServer.startup(KafkaServer.scala:310) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.Kafka$.main(Kafka.scala:109) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1(Kafka.scala:107) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1$adapted(Kafka.scala:107) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:118) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:110) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:309) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:61) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] ... 21 more{code} Using the following configs in a Kafka broker: jaas configuration file: {code:java} KafkaServer { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="/var/KAFKA_BROKER/kafka.keytab" principal="kafka/hgiovr@SITE"; org.apache.kafka.common.security.scram.ScramLoginModule required; };{code} and the following properties: {code:java} listener.name.sasl_ssl.sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuth
[jira] [Commented] (KAFKA-14716) Connect schema does not allow struct default values
[ https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688979#comment-17688979 ] Daniel Urban commented on KAFKA-14716: -- [~ChrisEgerton] Indeed, it is, thanks for pointing that out - will try to comment on the PR of KAFKA-12694 > Connect schema does not allow struct default values > --- > > Key: KAFKA-14716 > URL: https://issues.apache.org/jira/browse/KAFKA-14716 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > The ConnectSchema API should allow specifying a composite (struct) default > value for a field, but with the current API, it is impossible to do so. > # There is a circular dependency between creating a struct as a default > value and creating the schema which holds it as the default value. The Struct > constructor expects a Schema object, and the default value setter of > SchemaBuilder checks schema conformity by using the > ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This > can only be bypassed if the struct references a SchemaBuilder instance, and > defaultValue is called on that builder instance, but this goes against the > Struct docs stating that "Struct objects must specify a complete \{@link > Schema} up front". > # ConnectSchema.equals is not prepared to be used with other Schema > implementations, so equals checks between ConnectSchema and SchemaBuilder > instances will always fail. This is only causing an issue if equals has to be > used for schema conformity checks. > Code examples: > Working code (mind that the schema referenced by the Struct is a > SchemaBuilder, and it is mutated after the Struct is constructed): > {code:java} > @Test > public void testCompositeDefault() { > SchemaBuilder nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > nestedSchema > .defaultValue(nestedDefault) > .build() > ) > .build(); > } {code} > Not working code (but better aligned with the current API and docs - 2 > separate Schema instances used by the Struct and the field, only diff is the > default value between the 2): > {code:java} > @Test > public void testCompositeDefault() { > Schema nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA) > .build(); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > SchemaBuilder > .struct() > .field("bar", Schema.STRING_SCHEMA) > .defaultValue(nestedDefault) > .build() > ) > .build(); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14716) Connect schema does not allow struct default values
[ https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-14716. -- Resolution: Duplicate > Connect schema does not allow struct default values > --- > > Key: KAFKA-14716 > URL: https://issues.apache.org/jira/browse/KAFKA-14716 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > The ConnectSchema API should allow specifying a composite (struct) default > value for a field, but with the current API, it is impossible to do so. > # There is a circular dependency between creating a struct as a default > value and creating the schema which holds it as the default value. The Struct > constructor expects a Schema object, and the default value setter of > SchemaBuilder checks schema conformity by using the > ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This > can only be bypassed if the struct references a SchemaBuilder instance, and > defaultValue is called on that builder instance, but this goes against the > Struct docs stating that "Struct objects must specify a complete \{@link > Schema} up front". > # ConnectSchema.equals is not prepared to be used with other Schema > implementations, so equals checks between ConnectSchema and SchemaBuilder > instances will always fail. This is only causing an issue if equals has to be > used for schema conformity checks. > Code examples: > Working code (mind that the schema referenced by the Struct is a > SchemaBuilder, and it is mutated after the Struct is constructed): > {code:java} > @Test > public void testCompositeDefault() { > SchemaBuilder nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > nestedSchema > .defaultValue(nestedDefault) > .build() > ) > .build(); > } {code} > Not working code (but better aligned with the current API and docs - 2 > separate Schema instances used by the Struct and the field, only diff is the > default value between the 2): > {code:java} > @Test > public void testCompositeDefault() { > Schema nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA) > .build(); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > SchemaBuilder > .struct() > .field("bar", Schema.STRING_SCHEMA) > .defaultValue(nestedDefault) > .build() > ) > .build(); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14716) Connect schema does not allow struct default values
[ https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14716: - Description: The ConnectSchema API should allow specifying a composite (struct) default value for a field, but with the current API, it is impossible to do so. # There is a circular dependency between creating a struct as a default value and creating the schema which holds it as the default value. The Struct constructor expects a Schema object, and the default value setter of SchemaBuilder checks schema conformity by using the ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can only be bypassed if the struct references a SchemaBuilder instance, and defaultValue is called on that builder instance, but this goes against the Struct docs stating that "Struct objects must specify a complete \{@link Schema} up front". # ConnectSchema.equals is not prepared to be used with other Schema implementations, so equals checks between ConnectSchema and SchemaBuilder instances will always fail. This is only causing an issue if equals has to be used for schema conformity checks. Code examples: Working code (mind that the schema referenced by the Struct is a SchemaBuilder, and it is mutated after the Struct is constructed): {code:java} @Test public void testCompositeDefault() { SchemaBuilder nestedSchema = SchemaBuilder.struct() .field("bar", Schema.STRING_SCHEMA); Struct nestedDefault = new Struct(nestedSchema); nestedDefault.put("bar", "default_value"); Schema schema = SchemaBuilder.struct() .field("foo", nestedSchema .defaultValue(nestedDefault) .build() ) .build(); } {code} Not working code (but better aligned with the current API and docs - 2 separate Schema instances used by the Struct and the field, only diff is the default value between the 2): {code:java} @Test public void testCompositeDefault() { Schema nestedSchema = SchemaBuilder.struct() .field("bar", Schema.STRING_SCHEMA) .build(); Struct nestedDefault = new Struct(nestedSchema); nestedDefault.put("bar", "default_value"); Schema schema = SchemaBuilder.struct() .field("foo", SchemaBuilder .struct() .field("bar", Schema.STRING_SCHEMA) .defaultValue(nestedDefault) .build() ) .build(); }{code} was: The ConnectSchema API should allow specifying a composite (struct) default value for a field, but with the current API, it is impossible to do so. # There is a circular dependency between creating a struct as a default value and creating the schema which holds it as the default value. The Struct constructor expects a Schema object, and the default value setter of SchemaBuilder checks schema conformity by using the ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can only be bypassed if the struct references a SchemaBuilder instance, and defaultValue is called on that builder instance, but this goes against the Struct docs stating that "Struct objects must specify a complete \{@link Schema} up front". # ConnectSchema.equals is not prepared to be used with other Schema implementations, so equals checks between ConnectSchema and SchemaBuilder instances will always fail. This is only causing an issue if equals has to be used for schema conformity checks. > Connect schema does not allow struct default values > --- > > Key: KAFKA-14716 > URL: https://issues.apache.org/jira/browse/KAFKA-14716 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > The ConnectSchema API should allow specifying a composite (struct) default > value for a field, but with the current API, it is impossible to do so. > # There is a circular dependency between creating a struct as a default > value and creating the schema which holds it as the default value. The Struct > constructor expects a Schema object, and the default value setter of > SchemaBuilder checks schema conformity by using the > ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This > can only be bypassed if the struct references a SchemaBuilder instance, and > defaultValue is called on that builder instance, but this goes against the > Struct docs stating that "Struct objects must specify a complete \{@link > Schema} up front". > # ConnectSchema.equals is not prepared to be used with other Schema > implementations, so equals checks between ConnectSchema and SchemaBuilder > instances will a
[jira] [Created] (KAFKA-14716) Connect schema does not allow struct default values
Daniel Urban created KAFKA-14716: Summary: Connect schema does not allow struct default values Key: KAFKA-14716 URL: https://issues.apache.org/jira/browse/KAFKA-14716 Project: Kafka Issue Type: Bug Reporter: Daniel Urban Assignee: Daniel Urban The ConnectSchema API should allow specifying a composite (struct) default value for a field, but with the current API, it is impossible to do so. # There is a circular dependency between creating a struct as a default value and creating the schema which holds it as the default value. The Struct constructor expects a Schema object, and the default value setter of SchemaBuilder checks schema conformity by using the ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can only be bypassed if the struct references a SchemaBuilder instance, and defaultValue is called on that builder instance, but this goes against the Struct docs stating that "Struct objects must specify a complete \{@link Schema} up front". # ConnectSchema.equals is not prepared to be used with other Schema implementations, so equals checks between ConnectSchema and SchemaBuilder instances will always fail. This is only causing an issue if equals has to be used for schema conformity checks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory
[ https://issues.apache.org/jira/browse/KAFKA-14667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14667: - Description: This was observed with Kafka 3.1.1, but I believe that latest versions are also affected. In the Cruise Control project, there is an integration test: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle On our infrastructure, this test fails every ~20th run with a timeout - the triggered preferred leadership election is never completed. After some investigation, it turns out that: # The admin client never gets a response from the broker. # The leadership change is executed successfully. # The ElectLeader purgatory never gets an update for the relevant topic partition. A few relevant lines from a failed run (this test uses an embedded cluster, logs are mixed): CC successfully sends a preferred election request to the controller (broker 0), topic1-0 needs a leadership change from broker 0 to broker 1: {code:java} 2023-02-01 01:20:26.028 [controller-event-thread] DEBUG kafka.controller.KafkaController - [Controller id=0] Waiting for any successful result for election type (PREFERRED) by AdminClientTriggered for partitions: Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, message=Leader election not needed for topic partition.))) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) {code} The delayed operation for the leader election is triggered 2 times in quick succession (yes, same ms in both logs): {code:java} 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1){code} Shortly after (few ms later based on the logs), broker 0 receives an UpdateMetadataRequest from the controller (itself) and processes it: {code:java} 2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19) and timeout 3 to node 0: UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='topic1', topicId=gkFP8VnkSGyEf_LBBZSowQ, partitionStates=[UpdateMetadataPartitionState(topicName='topic1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null)]) 2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Received UPDATE_METADATA response from node 0 for request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19): UpdateMetadataResponseData(errorCode=0) 2023-02-01 01:20:26.035 [data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG kafka.request.logger - Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} {code} The update metadata request should trigger an update on the ElectLeader purgatory, and we should see a log line like this: "Request key X unblocked Y ElectLeader." In the failin
[jira] [Created] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory
Daniel Urban created KAFKA-14667: Summary: Delayed leader election operation gets stuck in purgatory Key: KAFKA-14667 URL: https://issues.apache.org/jira/browse/KAFKA-14667 Project: Kafka Issue Type: Bug Affects Versions: 3.1.1 Reporter: Daniel Urban This was observer with Kafka 3.1.1, but I believe that latest versions are also affected. In the Cruise Control project, there is an integration test: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle On our infrastructure, this test fails every ~20th run with a timeout - the triggered preferred leadership election is never completed. After some investigation, it turns out that: # The admin client never gets a response from the broker. # The leadership change is executed successfully. # The ElectLeader purgatory never gets an update for the relevant topic partition. A few relevant lines from a failed run (this test uses an embedded cluster, logs are mixed): CC successfully sends a preferred election request to the controller (broker 0), topic1-0 needs a leadership change from broker 0 to broker 1: {code:java} 2023-02-01 01:20:26.028 [controller-event-thread] DEBUG kafka.controller.KafkaController - [Controller id=0] Waiting for any successful result for election type (PREFERRED) by AdminClientTriggered for partitions: Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, message=Leader election not needed for topic partition.))) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) {code} The delayed operation for the leader election is triggered 2 times in quick succession (yes, same ms in both logs): {code:java} 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1){code} Shortly after (few ms later based on the logs), broker 0 receives an UpdateMetadataRequest from the controller (itself) and processes it: {code:java} 2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19) and timeout 3 to node 0: UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='topic1', topicId=gkFP8VnkSGyEf_LBBZSowQ, partitionStates=[UpdateMetadataPartitionState(topicName='topic1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null)]) 2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Received UPDATE_METADATA response from node 0 for request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19): UpdateMetadataResponseData(errorCode=0) 2023-02-01 01:20:26.035 [data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG kafka.request.logger - Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} {code} The update metadata
[jira] [Created] (KAFKA-14653) MM2 should delay resolving config provider references
Daniel Urban created KAFKA-14653: Summary: MM2 should delay resolving config provider references Key: KAFKA-14653 URL: https://issues.apache.org/jira/browse/KAFKA-14653 Project: Kafka Issue Type: Sub-task Reporter: Daniel Urban Assignee: Daniel Urban MM2 eagerly resolves config providers, meaning that the generated Connector configurations contain inline values. This is a problem for sensitive and host-specific configurations, so MM2 should delay the resolution of such configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
[ https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-10586: - Issue Type: Task (was: Improvement) > Full support for distributed mode in dedicated MirrorMaker 2.0 clusters > --- > > Key: KAFKA-10586 > URL: https://issues.apache.org/jira/browse/KAFKA-10586 > Project: Kafka > Issue Type: Task > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Chris Egerton >Priority: Major > Labels: cloudera > > KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated > MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means > that with specific workloads, the dedicated MM2 cluster can become unable to > react to dynamic topic and group filter changes. > (This occurs when after a rebalance operation, the leader node has no > MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is > stopped on the leader, meaning it cannot detect config changes by itself. > Followers still running the connector can detect config changes, but they > cannot query the leader for config updates.) > Besides the REST support, config provider references should be evaluated > lazily in connector configurations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context
Daniel Urban created KAFKA-14652: Summary: Improve MM2 logging by adding the flow information to the context Key: KAFKA-14652 URL: https://issues.apache.org/jira/browse/KAFKA-14652 Project: Kafka Issue Type: Improvement Reporter: Daniel Urban MirrorMaker2 runs multiple Connect worker instances in a single process. In Connect, the logging is based on the assumption that Connector names are unique. But in MM2, the same Connector names are being used in each flow (Connect group). This means that there is no way to differentiate between the logs of MirrorSourceConnector in A->B and in B->A. This can be improved by adding the flow to the logging context. Additionally, the client.id of the Kafka clients used by the MM2 Connectors should also be set explicitly, with the flow information added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
[ https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566808#comment-17566808 ] Daniel Urban commented on KAFKA-14053: -- I understand that increasing the epoch on the client side is probably violating the contract in the protocol. Refactored my change so the client side timeouts (both delivery and request timeout) will become fatal errors in transactional producers, resulting a last, best-effort epoch bump. > Transactional producer should bump the epoch when a batch encounters delivery > timeout > - > > Key: KAFKA-14053 > URL: https://issues.apache.org/jira/browse/KAFKA-14053 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > When a batch fails due to delivery timeout, it is possible that the batch is > still in-flight. Due to underlying infra issues, it is possible that an > EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight > batch is processed on the leader. This can cause transactional batches to be > appended to the log after the corresponding abort marker. > This can cause the LSO to be infinitely blocked in the partition, or can even > violate processing guarantees, as the out-of-order batch can become part of > the next transaction. > Because of this, the producer should skip aborting the partition, and bump > the epoch to fence the in-flight requests. > > More detail can be found here: > [https://lists.apache.org/thread/8d2oblsjtdv7740glc37v79f0r7p99dp] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
[ https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564222#comment-17564222 ] Daniel Urban commented on KAFKA-14053: -- About to submit a PR, but wanted to bring up a few more points: * When we bump in the middle of an ongoing transaction, the coordinator handles that as a fencing operation, and does not return a valid epoch - the coordinator assumes that a new producer was started with the same ID, and forces the InitProducerIDRequest to be retried. This will result in a 2nd bump, in which the epoch is returned. * If we go with the simplest solution, and bump once, the producer must transition into a fatal state. The producer cannot safely get a new epoch from the coordinator, as sending the original epoch multiple times will first result in CONCURRENT_TRANSACTIONS (while the ongoing transaction is aborted) then in PRODUCER_FENCED (when the coordinator realizes that the epoch was bumped already). It is not safe to send an empty InitProducerIDRequest, as we might fence a newer producer instance with that. * In my PR, I try to bump twice. First, with using the original epoch (which will eventually get a PRODUCER_FENCED), then I increase the epoch by 1 on the client side, and try again. Conceptually, this might sound wrong (the producer is using an epoch which wasn't returned by the coordinator), but I think that theoretically it is safe to do. The first bump during an ongoing transaction results in an epoch which is never returned to any producers. If we retry with the increased epoch, that won't collide with any other producers. If the bump with the increased epoch succeeds, then there was no concurrent bump from other instances, and the producer can continue using the new epoch. If the increased epoch receives a PRODUCER_FENCED, then the producer was actually fenced by another instance. * Due to the previous point, the producer needs to make sure that it can safely increase the epoch by 1 at any point. This means that if the producer got Short.MAX as the epoch, it won't be able to do the +1 increase locally. To avoid this, I made a change in the producer to force an epoch reset from the coordinator when the epoch is Short.MAX. * I understand that the local epoch increase and the intentional epoch reset might not be ideal, but without those, the producer cannot be safely used after a delivery timeout. So in my current understanding, a delivery timeout either means a fatal error, or we need to accept the local epoch increase and the epoch reset on Short.MAX. > Transactional producer should bump the epoch when a batch encounters delivery > timeout > - > > Key: KAFKA-14053 > URL: https://issues.apache.org/jira/browse/KAFKA-14053 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > When a batch fails due to delivery timeout, it is possible that the batch is > still in-flight. Due to underlying infra issues, it is possible that an > EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight > batch is processed on the leader. This can cause transactional batches to be > appended to the log after the corresponding abort marker. > This can cause the LSO to be infinitely blocked in the partition, or can even > violate processing guarantees, as the out-of-order batch can become part of > the next transaction. > Because of this, the producer should skip aborting the partition, and bump > the epoch to fence the in-flight requests. > > More detail can be found here: > [https://lists.apache.org/thread/8d2oblsjtdv7740glc37v79f0r7p99dp] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
[ https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14053: - Description: When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. Because of this, the producer should skip aborting the partition, and bump the epoch to fence the in-flight requests. was: When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. > Transactional producer should bump the epoch when a batch encounters delivery > timeout > - > > Key: KAFKA-14053 > URL: https://issues.apache.org/jira/browse/KAFKA-14053 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > When a batch fails due to delivery timeout, it is possible that the batch is > still in-flight. Due to underlying infra issues, it is possible that an > EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight > batch is processed on the leader. This can cause transactional batches to be > appended to the log after the corresponding abort marker. > This can cause the LSO to be infinitely blocked in the partition, or can even > violate processing guarantees, as the out-of-order batch can become part of > the next transaction. > Because of this, the producer should skip aborting the partition, and bump > the epoch to fence the in-flight requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
Daniel Urban created KAFKA-14053: Summary: Transactional producer should bump the epoch when a batch encounters delivery timeout Key: KAFKA-14053 URL: https://issues.apache.org/jira/browse/KAFKA-14053 Project: Kafka Issue Type: Bug Reporter: Daniel Urban Assignee: Daniel Urban When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException
[ https://issues.apache.org/jira/browse/KAFKA-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552025#comment-17552025 ] Daniel Urban commented on KAFKA-13970: -- Sorry for the confusion, the Issue I reported depends on an unmerged PR which I thought was already merged. This can be closed. > TopicAdmin topic creation should be retried on TimeoutException > --- > > Key: KAFKA-13970 > URL: https://issues.apache.org/jira/browse/KAFKA-13970 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Daniel Urban >Assignee: Sagar Rao >Priority: Major > > org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the > case when there aren't enough brokers in the cluster to create a topic with > the expected replication factor. This logic should also handle the case when > there are 0 brokers in the cluster, and should retry in that case. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException
Daniel Urban created KAFKA-13970: Summary: TopicAdmin topic creation should be retried on TimeoutException Key: KAFKA-13970 URL: https://issues.apache.org/jira/browse/KAFKA-13970 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Daniel Urban org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the case when there aren't enough brokers in the cluster to create a topic with the expected replication factor. This logic should also handle the case when there are 0 brokers in the cluster, and should retry in that case. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException
[ https://issues.apache.org/jira/browse/KAFKA-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-13970: - Issue Type: Bug (was: Improvement) > TopicAdmin topic creation should be retried on TimeoutException > --- > > Key: KAFKA-13970 > URL: https://issues.apache.org/jira/browse/KAFKA-13970 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Daniel Urban >Priority: Major > > org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the > case when there aren't enough brokers in the cluster to create a topic with > the expected replication factor. This logic should also handle the case when > there are 0 brokers in the cluster, and should retry in that case. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542435#comment-17542435 ] Daniel Urban commented on KAFKA-12671: -- I think I encountered a different path than the one you describe: # InitProducerID with transactional ID # AddPartitionsToTxn # Produce, but with failure, retry is triggered # During a retry, delivery timeout is triggered # Abort sent to TransactionCoordinator # Abort marker sent by TransactionCoordinator # Last retry of the produce request is processed # Transactional ID is dropped and never used again I'm unsure how 4-5 can happen - client was a Java client, and not too old (2.6). It is possible that there were some network errors in the environment when this occurred. The result was that there were X batches produced in a specific transaction, but the last 2 batches were appended to the log after the abort marker. Also, the transaction coordinator wasn't aware of this, and handled it as a successful abort, and not having any open transaction for the specific transactional ID. Due to this, LSO was stuck in the partition, and transaction timeout was never triggered. Also, if the transactional ID is reused, and somehow unblocks the partition by starting a new transaction (and using the same partition), it is still an issue, as it violates the processing guarantees. If the transactional ID is reused in the scenario which I described, and the first transaction after the out-of-order messages is committed, that transaction will contain 2 extra batches from the previous (aborted) transaction. > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Travis Bischel >Priority: Major > Labels: Transactions > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely
[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530039#comment-17530039 ] Daniel Urban commented on KAFKA-12671: -- I believe I encountered this issue with the Java producer in a situation where the produce requests got timed out (due to delivery timeout), and the abort control message got appended to the partition before the last few batches sent in the transaction. [~twmb] Do you think it makes sense? A client side timeout would mean that the transaction manager inside the Java producer has no definitive answer about the fate of the produce request, and just assumes that it failed. > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > -- > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Travis Bischel >Priority: Major > Labels: Transactions > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: > [DEBUG] read FindCoordinator v3; err: > [DEBUG] wrote InitProducerID v4; err: > [DEBUG] read InitProducerID v4; err: > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: > [DEBUG] read AddPartitionsToTxn v2; err: > [D
[jira] [Resolved] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
[ https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-13809. -- Resolution: Won't Fix Connector configs are used for consumer/producer overrides, converter classes, etc. Not necessary to propagate all configs to tasks. > FileStreamSinkConnector and FileStreamSourceConnector should propagate full > configuration to tasks > -- > > Key: KAFKA-13809 > URL: https://issues.apache.org/jira/browse/KAFKA-13809 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Daniel Urban >Priority: Major > > The 2 example connectors do not propagate the full connector configuration to > the tasks. This makes it impossible to override built-in configs, such as > producer/consumer overrides. > This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
[ https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-13809: - Description: The 2 example connectors do not propagate the full connector configuration to the tasks. This makes it impossible to override built-in configs, such as producer/consumer overrides. This causes an issue even when used for testing purposes. was: The 2 example connectors do not propagate the full connector configuration to the tasks. This makes it impossible to override built-in configs, such as key.converter. This causes an issue even when used for testing purposes. > FileStreamSinkConnector and FileStreamSourceConnector should propagate full > configuration to tasks > -- > > Key: KAFKA-13809 > URL: https://issues.apache.org/jira/browse/KAFKA-13809 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Daniel Urban >Priority: Major > > The 2 example connectors do not propagate the full connector configuration to > the tasks. This makes it impossible to override built-in configs, such as > producer/consumer overrides. > This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
Daniel Urban created KAFKA-13809: Summary: FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks Key: KAFKA-13809 URL: https://issues.apache.org/jira/browse/KAFKA-13809 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Daniel Urban The 2 example connectors do not propagate the full connector configuration to the tasks. This makes it impossible to override built-in configs, such as key.converter. This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class
[ https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-13756: - Description: Currently, if there is an issue with the connector class, the validate endpoint returns a 400 or a 500 response. Instead, it should return a well formatted response containing a proper validation error message. was: Currently, if there is an issue with the connector name or the connector class, the validate endpoint returns a 500 response. Instead, it should return a well formatted response containing proper validation error messages. > Connect validate endpoint should return proper response for invalid connector > class > --- > > Key: KAFKA-13756 > URL: https://issues.apache.org/jira/browse/KAFKA-13756 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Priority: Major > > Currently, if there is an issue with the connector class, the validate > endpoint returns a 400 or a 500 response. > Instead, it should return a well formatted response containing a proper > validation error message. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class
[ https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-13756: - Summary: Connect validate endpoint should return proper response for invalid connector class (was: Connect validate endpoint should return proper response on name and connector class error) > Connect validate endpoint should return proper response for invalid connector > class > --- > > Key: KAFKA-13756 > URL: https://issues.apache.org/jira/browse/KAFKA-13756 > Project: Kafka > Issue Type: Improvement >Reporter: Daniel Urban >Priority: Major > > Currently, if there is an issue with the connector name or the connector > class, the validate endpoint returns a 500 response. > Instead, it should return a well formatted response containing proper > validation error messages. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13756) Connect validate endpoint should return proper response on name and connector class error
Daniel Urban created KAFKA-13756: Summary: Connect validate endpoint should return proper response on name and connector class error Key: KAFKA-13756 URL: https://issues.apache.org/jira/browse/KAFKA-13756 Project: Kafka Issue Type: Improvement Reporter: Daniel Urban Currently, if there is an issue with the connector name or the connector class, the validate endpoint returns a 500 response. Instead, it should return a well formatted response containing proper validation error messages. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13459) MM2 should be able to add the source offset to the record header
Daniel Urban created KAFKA-13459: Summary: MM2 should be able to add the source offset to the record header Key: KAFKA-13459 URL: https://issues.apache.org/jira/browse/KAFKA-13459 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban MM2 could add the source offset to the record header to help with diagnostics in some use-cases. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13452) MM2 creates invalid checkpoint when offset mapping is not available
Daniel Urban created KAFKA-13452: Summary: MM2 creates invalid checkpoint when offset mapping is not available Key: KAFKA-13452 URL: https://issues.apache.org/jira/browse/KAFKA-13452 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban MM2 checkpointing reads the offset-syncs topic to create offset mappings for committed consumer group offsets. In some corner cases, it is possible that a mapping is not available in offset-syncs - in that case, MM2 simply copies the source offset, which might not be a valid offset in the replica topic at all. One possible situation is if there is an empty topic in the source cluster with a non-zero endoffset (e.g. retention already removed the records), and a consumer group which has a committed offset set to the end offset. If replication is configured to start replicating this topic, it will not have an offset mapping available in offset-syncs (as the topic is empty), causing MM2 to copy the source offset. This can cause issues when auto offset sync is enabled, as the consumer group offset can be potentially set to a high number. MM2 never rewinds these offsets, so even when there is a correct offset mapping available, the offset will not be updated correctly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connect worker and to the Connector config
[ https://issues.apache.org/jira/browse/KAFKA-13311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-13311: - Description: Currently, the configuration propagation logic in MM2 only allows a handful of configurations to be applied to all Connector configs managed by MM2. In some cases (e.g. custom topic or group filters, metric reporters, etc.) it would be useful to be able to define configurations "globally", without prefixing the config with each replication. E.g. the "connectors." prefix could be used to declare global Connector configs. Similarly, only a handful of Connect worker configurations can be configured on the "global" level - there should be a prefix for global worker configs as well (e.g. "workers.") was: Currently, the configuration propagation logic in MM2 only allows a handful of configurations to be applied to all Connector configs managed by MM2. In some cases (e.g. custom topic or group filters, metric reporters, etc.) it would be useful to be able to define configurations "globally", without prefixing the config with each replication. E.g. the "connectors." prefix could be used to declare global Connector configs. > MM2 should allow propagating arbitrary global configurations to the Connect > worker and to the Connector config > -- > > Key: KAFKA-13311 > URL: https://issues.apache.org/jira/browse/KAFKA-13311 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.8.1 >Reporter: Daniel Urban >Priority: Major > > Currently, the configuration propagation logic in MM2 only allows a handful > of configurations to be applied to all Connector configs managed by MM2. > In some cases (e.g. custom topic or group filters, metric reporters, etc.) it > would be useful to be able to define configurations "globally", without > prefixing the config with each replication. > E.g. the "connectors." prefix could be used to declare global Connector > configs. > Similarly, only a handful of Connect worker configurations can be configured > on the "global" level - there should be a prefix for global worker configs as > well (e.g. "workers.") -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connect worker and to the Connector config
[ https://issues.apache.org/jira/browse/KAFKA-13311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-13311: - Summary: MM2 should allow propagating arbitrary global configurations to the Connect worker and to the Connector config (was: MM2 should allow propagating arbitrary global configurations to the Connector config) > MM2 should allow propagating arbitrary global configurations to the Connect > worker and to the Connector config > -- > > Key: KAFKA-13311 > URL: https://issues.apache.org/jira/browse/KAFKA-13311 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.8.1 >Reporter: Daniel Urban >Priority: Major > > Currently, the configuration propagation logic in MM2 only allows a handful > of configurations to be applied to all Connector configs managed by MM2. > In some cases (e.g. custom topic or group filters, metric reporters, etc.) it > would be useful to be able to define configurations "globally", without > prefixing the config with each replication. > E.g. the "connectors." prefix could be used to declare global Connector > configs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connector config
Daniel Urban created KAFKA-13311: Summary: MM2 should allow propagating arbitrary global configurations to the Connector config Key: KAFKA-13311 URL: https://issues.apache.org/jira/browse/KAFKA-13311 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.8.1 Reporter: Daniel Urban Assignee: Daniel Urban Currently, the configuration propagation logic in MM2 only allows a handful of configurations to be applied to all Connector configs managed by MM2. In some cases (e.g. custom topic or group filters, metric reporters, etc.) it would be useful to be able to define configurations "globally", without prefixing the config with each replication. E.g. the "connectors." prefix could be used to declare global Connector configs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connector config
[ https://issues.apache.org/jira/browse/KAFKA-13311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-13311: Assignee: (was: Daniel Urban) > MM2 should allow propagating arbitrary global configurations to the Connector > config > > > Key: KAFKA-13311 > URL: https://issues.apache.org/jira/browse/KAFKA-13311 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.8.1 >Reporter: Daniel Urban >Priority: Major > > Currently, the configuration propagation logic in MM2 only allows a handful > of configurations to be applied to all Connector configs managed by MM2. > In some cases (e.g. custom topic or group filters, metric reporters, etc.) it > would be useful to be able to define configurations "globally", without > prefixing the config with each replication. > E.g. the "connectors." prefix could be used to declare global Connector > configs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-13253. -- Resolution: Duplicate Same issue as KAFKA-9747 - that one already has a fix under review > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the connector name contains some special characters, > such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an > uncatched exception is raised in RestClient and the forward is lost. > Here is the kind of exception we can catch by adding the necessary code in > RestClient: > {quote}java.lang.IllegalArgumentException: Illegal character in path at index > 51: > [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks > {quote} > An additional catch() should be added in RestClient.httpRequest(), here: > {quote}{{catch (IOException | InterruptedException | TimeoutException | > ExecutionException e) {}} > log.error("IO error forwarding REST request: ", e); > {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, > "IO Error trying to forward REST request: " + e.getMessage(), e);}} > {{} finally {}} > {quote} > to catch all other exceptions because without, this kind of problem is > completly silent. > To reproduce: > * start 2 kafka clusters > * start a kafka connect (distributed) with at least 2 nodes > * start an HeartbeatConnector with name "cluster1->cluster2" > If the node which generated the task is not the leader (not systematic), it > will forward the creation to the leader and it will be lost. As a result, the > connector will stay in RUNNING state but without any task. > Problem not easy to reproduce, it is important to start with empty connect > topics to reproduce more easily -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393655#comment-17393655 ] Daniel Urban commented on KAFKA-9747: - To add more details to the issue: * When running multiple workers * AND the Connector name contains a non-URL compatible character * AND a follower worker has the Connector in its assignment * Then the follower->leader request sent over Connect REST fails in RestClient (without any error logging, or the corresponding future ever completed) > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Assignee: Andras Katona >Priority: Major > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > {code} > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > {code} > S3 Connector: > {code} > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > {code} > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration
[ https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393104#comment-17393104 ] Daniel Urban commented on KAFKA-10719: -- I believe that the core issue here is that the connectors will start up with the old config first, and then the new config gets applied by MM2. If the old listener is not available, that will cause timeouts in the rebalance. Connectors have a default timeout which exceeds the default rebalance timeout. When the rebalance timeout happens, none of the MM2 nodes will be able to update the config after the startup. Besides deleting the config topic, you can also try to increase the rebalance timeout of the Connect workers running inside MM2 (to something around 3 minutes), which will allow the cluster to not hit a rebalance timeout, and then successfully apply the new configs. > MirrorMaker2 fails to update its runtime configuration > -- > > Key: KAFKA-10719 > URL: https://issues.apache.org/jira/browse/KAFKA-10719 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Peter Sinoros-Szabo >Priority: Major > > I was running successfully the MM2 cluster with the following configuration, > I simplified it a little: {code:java} clusters = main, backup > main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 > backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 > main->backup.enabled = true main->backup.topics = .*{code} I wanted to change > the bootstrap.address list of the destination cluster to a different list > that is pointing to the *same* cluster, just a different listener with a > different routing. So I changed it to: {code:java} backup.bootstrap.servers = > backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart > on the MM2 nodes and say that some tasks were still using the old bootstrap > addresses, some of them were using the new one. I don't have the logs, so > unfortunately I don't know which one picked up the good values and which > didn't. I even stopped the cluster completely, but it didn't help. Ryanne > adviced to delete the mm2-config and mm2-status topics, so I did delete those > on the destination cluster, that solved this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.
[ https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393101#comment-17393101 ] Daniel Urban commented on KAFKA-9981: - This KIP aims to fix the issue by adding the REST API to MM2, and also improving the config provider reference handling in the MM2 configs: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters] > Running a dedicated mm2 cluster with more than one nodes,When the > configuration is updated the task is not aware and will lose the update > operation. > > > Key: KAFKA-9981 > URL: https://issues.apache.org/jira/browse/KAFKA-9981 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: victor >Priority: Major > > DistributedHerder.reconfigureConnector induction config update as follows: > {code:java} > if (changed) { > List> rawTaskProps = reverseTransform(connName, > configState, taskProps); > if (isLeader()) { > configBackingStore.putTaskConfigs(connName, rawTaskProps); > cb.onCompletion(null, null); > } else { > // We cannot forward the request on the same thread because this > reconfiguration can happen as a result of connector > // addition or removal. If we blocked waiting for the response from > leader, we may be kicked out of the worker group. > forwardRequestExecutor.submit(new Runnable() { > @Override > public void run() { > try { > String leaderUrl = leaderUrl(); > if (leaderUrl == null || leaderUrl.trim().isEmpty()) { > cb.onCompletion(new ConnectException("Request to > leader to " + > "reconfigure connector tasks failed " + > "because the URL of the leader's REST > interface is empty!"), null); > return; > } > String reconfigUrl = RestServer.urlJoin(leaderUrl, > "/connectors/" + connName + "/tasks"); > log.trace("Forwarding task configurations for connector > {} to leader", connName); > RestClient.httpRequest(reconfigUrl, "POST", null, > rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); > cb.onCompletion(null, null); > } catch (ConnectException e) { > log.error("Request to leader to reconfigure connector > tasks failed", e); > cb.onCompletion(e, null); > } > } > }); > } > } > {code} > KafkaConfigBackingStore task checks for configuration updates,such as topic > whitelist update.If KafkaConfigBackingStore task is not running on leader > node,an HTTP request will be send to notify the leader of the configuration > update.However,dedicated mm2 cluster does not have the HTTP server turned > on,so the request will fail to be sent,causing the update operation to be > lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9805) Running MirrorMaker in a Connect cluster,but the task not running
[ https://issues.apache.org/jira/browse/KAFKA-9805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393080#comment-17393080 ] Daniel Urban commented on KAFKA-9805: - I believe this is the same issue as KAFKA-9747, see the answer from [~akatona] about the issue. > Running MirrorMaker in a Connect cluster,but the task not running > - > > Key: KAFKA-9805 > URL: https://issues.apache.org/jira/browse/KAFKA-9805 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1 > Environment: linux >Reporter: ZHAO GH >Priority: Major > Original Estimate: 5h > Remaining Estimate: 5h > > when i am Running MirrorMaker in a Connect clusterwhen i am Running > MirrorMaker in a Connect cluster,sometime the task running,but sometime the > task cannot assignment。 > I post connector config to connect cluster,here is my config > http://99.12.98.33:8083/connectorshttp://99.12.98.33:8083/connectors > { "name": "kafka->kafka241-3", > "config": { > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "topics": "MM2-3", > "key.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter", > "tasks.max": 8, > "sasl.mechanism": "PLAIN", > "security.protocol": "SASL_PLAINTEXT", > "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"admin\" password=\"admin\";", > "source.cluster.alias": "kafka", "source.cluster.bootstrap.servers": > "55.13.104.70:9092,55.13.104.74:9092,55.13.104.126:9092", > "source.admin.sasl.mechanism": "PLAIN", > "source.admin.security.protocol": "SASL_PLAINTEXT", > "source.admin.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "target.cluster.alias": "kafka241", > "target.cluster.bootstrap.servers": > "55.14.111.22:9092,55.14.111.23:9092,55.14.111.25:9092", > "target.admin.sasl.mechanism": "PLAIN", > "target.admin.security.protocol": "SASL_PLAINTEXT", > "target.admin.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "producer.sasl.mechanism": "PLAIN", > "producer.security.protocol": "SASL_PLAINTEXT", > "producer.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "consumer.sasl.mechanism": "PLAIN", > "consumer.security.protocol": "SASL_PLAINTEXT", > "consumer.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "consumer.group.id": "mm2-1" > } > } > > but I get the connector status,found not tasks running > http://99.12.98.33:8083/connectors/kafka->kafka241-3/status > { > "name": "kafka->kafka241-3", > "connector": { > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > "tasks": [], > "type": "source" > } > > but sometime,the task run success > http://99.12.98.33:8083/connectors/kafka->kafka241-1/status > { > "name": "kafka->kafka241-1", > "connector": { > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > { > "id": 1, > "state": "RUNNING", > "worker_id": "99.12.98.33:8083" > }, > { > "id": 2, > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > } > ], > "type": "source" > } > is somebody met this problem? how to fix it,is it a bug? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12966) MM2 doesn't trigger replicate data on new topics
[ https://issues.apache.org/jira/browse/KAFKA-12966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364996#comment-17364996 ] Daniel Urban commented on KAFKA-12966: -- I believe you are hitting the issue described in [KIP-710|https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters], MM2 doesn't include a Connect REST Server, making the follower->leader communication impossible. > MM2 doesn't trigger replicate data on new topics > > > Key: KAFKA-12966 > URL: https://issues.apache.org/jira/browse/KAFKA-12966 > Project: Kafka > Issue Type: Bug >Reporter: Tommy >Priority: Major > > After starting MM2, all topics was replicated, but when I create new topics, > it seems MM2 doesn't trigger to replicate data to these topics, although it > still create replica topic on target cluster. I have to restart all mm2 > instances. This is not expected thing in an active cluster, when the new > topics are created every day. Is it a bug or a feature? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12893) MM2 fails to replicate if starting two+ nodes same time
[ https://issues.apache.org/jira/browse/KAFKA-12893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357372#comment-17357372 ] Daniel Urban commented on KAFKA-12893: -- You might be hitting the issue of MM2 not running the Connect REST Server, thus not supporting follower->leader communication in the cluster. See KIP-710 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)] for details. The way to identify the situation is to check whether the MirrorSourceConnector is hosted on the leader or the follower node. If the leader is hosting it, replication should be fine, if the follower, you probably never get to a point where topics are actually picked up. > MM2 fails to replicate if starting two+ nodes same time > --- > > Key: KAFKA-12893 > URL: https://issues.apache.org/jira/browse/KAFKA-12893 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0 >Reporter: Tommi Vainikainen >Priority: Major > > I've observed a situation where starting more than one MM2 node in parallel, > MM2 fails to start replication ie. replication flow seems to be stuck without > action. I used exactly same mm2.properties file to start only one at a time, > and the replication flow was proceeding smoothly. > In my setup dc1 has topic "mytopic1" and there is a producer with approx 1 > msg/sec, and I'm trying to repilcate this to dc2. What I observed is that > dc1.mytopic1 is created when initially launching two paraller MM2 instances, > but no messages gets written into the topic as I would expect. If I kill MM2 > instances, and only start one MM2 node, then MM2 starts replicating the > messages in mytopic1. > My mm2.properties: > clusters=dc2, dc1 > dc1->dc2.emit.heartbeats.enabled=true > dc1->dc2.enabled=true > dc1->dc2.sync.group.offsets.enabled=false > dc1->dc2.sync.group.offsets.interval.seconds=45 > dc1->dc2.topics=mytopic1 > dc1->dc2.topics.exclude= > dc1.bootstrap.servers=tvainika-dc1-dev-sandbox.aivencloud.com:12693 > dc1.security.protocol=SSL > dc1.ssl.keystore.type=PKCS12 > dc1.ssl.keystore.location=dc1/client.keystore.p12 > dc1.ssl.keystore.password=secret > dc1.ssl.key.password=secret > dc1.ssl.truststore.location=dc1/client.truststore.jks > dc1.ssl.truststore.password=secret > dc2.bootstrap.servers=tvainika-dc2-dev-sandbox.aivencloud.com:12693 > dc2.security.protocol=SSL > dc2.ssl.keystore.type=PKCS12 > dc2.ssl.keystore.location=dc2/client.keystore.p12 > dc2.ssl.keystore.password=secret > dc2.ssl.key.password=secret > dc2.ssl.truststore.location=dc2/client.truststore.jks > dc2.ssl.truststore.password=secret > tasks.max=3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode
[ https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-12664. -- Resolution: Not A Bug > Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 > clusters in standalone mode > - > > Key: KAFKA-12664 > URL: https://issues.apache.org/jira/browse/KAFKA-12664 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0 >Reporter: Edward Vaisman >Assignee: Daniel Urban >Priority: Major > Attachments: connect.log.tar.gz, docker-compose-multi.yml, > mm2.properties > > > Hi Folks, I came across this issue when trying to aggregate data from two > separate data centres into one data centre. > In the configuration below, you can see I am trying to replicate a topic from > dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 > (test_topic_dc2) to dc3. > However, when I try to replicate both topics from those datacenters at the > same time I notice that connect gets stuck in a rebalance loop (see > attachment for logs) > [^connect.log.tar.gz] > excerpt of connect.log > {code:java} > 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] > Successfully synced group in generation Generation{generationId=347, > memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > protocol='sessioned'} > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13 > 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group > at generation 347 with protocol version 2 and got assignment: > Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688) > {code} > To replicate the issue here is what I used: > [^mm2.properties] > {code:java} > clusters = dc1, dc2, dc3 > dc1.bootstrap.servers = kafka-dc1:19092 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc1.group.id=mm2-dc1 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc1->dc3.enabled = true > dc1->dc3.topics = test_topic_dc1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > dc3->dc2 = falsedc3->dc1 = false > {code} > This [^docker-compose-multi.yml] file to create local kafka clusters > (dc1,dc2,dc3) > (I set docker to use 6 cpus, 8gb mem, swap 2gb) > I then ran an interactive shell to run mirror maker within the same > docker-compose network (change network to match yours) > {code:java} > docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash > # Upload mm2 properties on server > /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code} > Kafkacat commands to produce to dc1, dc2 > {code:java} > kafkacat -b localhost:9092 -t test_topic_dc1 -P > Hello World from DC1!{code} > {code:java} > kafkacat -b localhost:9093 -t test_topic_dc2 -P > Hello World from DC2{code} > I then tried to remove one of the datacenters to confirm if it was a > configuration problem, however mirror maker ran successfully with the below > configuration > mm2.properties > {code:java} > clusters = dc2, dc3 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > {code
[jira] [Assigned] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode
[ https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-12664: Assignee: Daniel Urban > Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 > clusters in standalone mode > - > > Key: KAFKA-12664 > URL: https://issues.apache.org/jira/browse/KAFKA-12664 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0 >Reporter: Edward Vaisman >Assignee: Daniel Urban >Priority: Major > Attachments: connect.log.tar.gz, docker-compose-multi.yml, > mm2.properties > > > Hi Folks, I came across this issue when trying to aggregate data from two > separate data centres into one data centre. > In the configuration below, you can see I am trying to replicate a topic from > dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 > (test_topic_dc2) to dc3. > However, when I try to replicate both topics from those datacenters at the > same time I notice that connect gets stuck in a rebalance loop (see > attachment for logs) > [^connect.log.tar.gz] > excerpt of connect.log > {code:java} > 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] > Successfully synced group in generation Generation{generationId=347, > memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > protocol='sessioned'} > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13 > 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group > at generation 347 with protocol version 2 and got assignment: > Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688) > {code} > To replicate the issue here is what I used: > [^mm2.properties] > {code:java} > clusters = dc1, dc2, dc3 > dc1.bootstrap.servers = kafka-dc1:19092 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc1.group.id=mm2-dc1 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc1->dc3.enabled = true > dc1->dc3.topics = test_topic_dc1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > dc3->dc2 = falsedc3->dc1 = false > {code} > This [^docker-compose-multi.yml] file to create local kafka clusters > (dc1,dc2,dc3) > (I set docker to use 6 cpus, 8gb mem, swap 2gb) > I then ran an interactive shell to run mirror maker within the same > docker-compose network (change network to match yours) > {code:java} > docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash > # Upload mm2 properties on server > /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code} > Kafkacat commands to produce to dc1, dc2 > {code:java} > kafkacat -b localhost:9092 -t test_topic_dc1 -P > Hello World from DC1!{code} > {code:java} > kafkacat -b localhost:9093 -t test_topic_dc2 -P > Hello World from DC2{code} > I then tried to remove one of the datacenters to confirm if it was a > configuration problem, however mirror maker ran successfully with the below > configuration > mm2.properties > {code:java} > clusters = dc2, dc3 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 >
[jira] [Commented] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode
[ https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321057#comment-17321057 ] Daniel Urban commented on KAFKA-12664: -- Issue is that in your setup, there are 2 connect groups coordinating through dc3, and they need separate group ids. When you specify *dc3.*, it affects all groups coordinating through dc3. I'd say it might be a documentation issue. > Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 > clusters in standalone mode > - > > Key: KAFKA-12664 > URL: https://issues.apache.org/jira/browse/KAFKA-12664 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0 >Reporter: Edward Vaisman >Priority: Major > Attachments: connect.log.tar.gz, docker-compose-multi.yml, > mm2.properties > > > Hi Folks, I came across this issue when trying to aggregate data from two > separate data centres into one data centre. > In the configuration below, you can see I am trying to replicate a topic from > dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 > (test_topic_dc2) to dc3. > However, when I try to replicate both topics from those datacenters at the > same time I notice that connect gets stuck in a rebalance loop (see > attachment for logs) > [^connect.log.tar.gz] > excerpt of connect.log > {code:java} > 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] > Successfully synced group in generation Generation{generationId=347, > memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > protocol='sessioned'} > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13 > 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group > at generation 347 with protocol version 2 and got assignment: > Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688) > {code} > To replicate the issue here is what I used: > [^mm2.properties] > {code:java} > clusters = dc1, dc2, dc3 > dc1.bootstrap.servers = kafka-dc1:19092 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc1.group.id=mm2-dc1 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc1->dc3.enabled = true > dc1->dc3.topics = test_topic_dc1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > dc3->dc2 = falsedc3->dc1 = false > {code} > This [^docker-compose-multi.yml] file to create local kafka clusters > (dc1,dc2,dc3) > (I set docker to use 6 cpus, 8gb mem, swap 2gb) > I then ran an interactive shell to run mirror maker within the same > docker-compose network (change network to match yours) > {code:java} > docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash > # Upload mm2 properties on server > /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code} > Kafkacat commands to produce to dc1, dc2 > {code:java} > kafkacat -b localhost:9092 -t test_topic_dc1 -P > Hello World from DC1!{code} > {code:java} > kafkacat -b localhost:9093 -t test_topic_dc2 -P > Hello World from DC2{code} > I then tried to remove one of the datacenters to confirm if it was a > configuration problem, however mirror maker ran successfully with the below > configuration > mm2.properties > {code:java} > clusters = dc2, dc3 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.fact
[jira] [Commented] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode
[ https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320917#comment-17320917 ] Daniel Urban commented on KAFKA-12664: -- In short, just remove those group.id configs, and let the workers join the group generated by MM2, should work. > Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 > clusters in standalone mode > - > > Key: KAFKA-12664 > URL: https://issues.apache.org/jira/browse/KAFKA-12664 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0 >Reporter: Edward Vaisman >Priority: Major > Attachments: connect.log.tar.gz, docker-compose-multi.yml, > mm2.properties > > > Hi Folks, I came across this issue when trying to aggregate data from two > separate data centres into one data centre. > In the configuration below, you can see I am trying to replicate a topic from > dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 > (test_topic_dc2) to dc3. > However, when I try to replicate both topics from those datacenters at the > same time I notice that connect gets stuck in a rebalance loop (see > attachment for logs) > [^connect.log.tar.gz] > excerpt of connect.log > {code:java} > 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] > Successfully synced group in generation Generation{generationId=347, > memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > protocol='sessioned'} > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13 > 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group > at generation 347 with protocol version 2 and got assignment: > Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688) > {code} > To replicate the issue here is what I used: > [^mm2.properties] > {code:java} > clusters = dc1, dc2, dc3 > dc1.bootstrap.servers = kafka-dc1:19092 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc1.group.id=mm2-dc1 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc1->dc3.enabled = true > dc1->dc3.topics = test_topic_dc1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > dc3->dc2 = falsedc3->dc1 = false > {code} > This [^docker-compose-multi.yml] file to create local kafka clusters > (dc1,dc2,dc3) > (I set docker to use 6 cpus, 8gb mem, swap 2gb) > I then ran an interactive shell to run mirror maker within the same > docker-compose network (change network to match yours) > {code:java} > docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash > # Upload mm2 properties on server > /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code} > Kafkacat commands to produce to dc1, dc2 > {code:java} > kafkacat -b localhost:9092 -t test_topic_dc1 -P > Hello World from DC1!{code} > {code:java} > kafkacat -b localhost:9093 -t test_topic_dc2 -P > Hello World from DC2{code} > I then tried to remove one of the datacenters to confirm if it was a > configuration problem, however mirror maker ran successfully with the below > configuration > mm2.properties > {code:java} > clusters = dc2, dc3 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1
[jira] [Commented] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode
[ https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320763#comment-17320763 ] Daniel Urban commented on KAFKA-12664: -- MM2 spins up a Connect worker *per replication flow*. (The number of Connect workers are affected by the target clusters you provided for MM2 - if you did not do it explicitly, all clusters are targeted). This setting: dc3.group.id=mm2-dc3 Causes the Connect worker responsible for the dc1->dc3, and the worker responsible for dc2->dc3 join the same connect group. This causes a collision, likely to lead to those rebalances. > Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 > clusters in standalone mode > - > > Key: KAFKA-12664 > URL: https://issues.apache.org/jira/browse/KAFKA-12664 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0 >Reporter: Edward Vaisman >Priority: Major > Attachments: connect.log.tar.gz, docker-compose-multi.yml, > mm2.properties > > > Hi Folks, I came across this issue when trying to aggregate data from two > separate data centres into one data centre. > In the configuration below, you can see I am trying to replicate a topic from > dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 > (test_topic_dc2) to dc3. > However, when I try to replicate both topics from those datacenters at the > same time I notice that connect gets stuck in a rebalance loop (see > attachment for logs) > [^connect.log.tar.gz] > excerpt of connect.log > {code:java} > 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] > Successfully synced group in generation Generation{generationId=347, > memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > protocol='sessioned'} > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13 > 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group > at generation 347 with protocol version 2 and got assignment: > Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688) > {code} > To replicate the issue here is what I used: > [^mm2.properties] > {code:java} > clusters = dc1, dc2, dc3 > dc1.bootstrap.servers = kafka-dc1:19092 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc1.group.id=mm2-dc1 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc1->dc3.enabled = true > dc1->dc3.topics = test_topic_dc1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > dc3->dc2 = falsedc3->dc1 = false > {code} > This [^docker-compose-multi.yml] file to create local kafka clusters > (dc1,dc2,dc3) > (I set docker to use 6 cpus, 8gb mem, swap 2gb) > I then ran an interactive shell to run mirror maker within the same > docker-compose network (change network to match yours) > {code:java} > docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash > # Upload mm2 properties on server > /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code} > Kafkacat commands to produce to dc1, dc2 > {code:java} > kafkacat -b localhost:9092 -t test_topic_dc1 -P > Hello World from DC1!{code} > {code:java} > kafkacat -b localhost:9093 -t test_topic_dc2 -P > Hello World from DC2{code} > I then tried to remove one of the datacenters to confirm if it was a > configuration problem, however mirror maker ran successfully with the below > configuration > mm2.properties > {code:java} > clusters = dc2, dc3 > dc2.bootstrap.servers = kafka
[jira] [Updated] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
[ https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-10586: - Description: KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that with specific workloads, the dedicated MM2 cluster can become unable to react to dynamic topic and group filter changes. (This occurs when after a rebalance operation, the leader node has no MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is stopped on the leader, meaning it cannot detect config changes by itself. Followers still running the connector can detect config changes, but they cannot query the leader for config updates.) Besides the REST support, config provider references should be evaluated lazily in connector configurations. was: KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that with specific workloads, the dedicated MM2 cluster can become unable to react to dynamic topic and group filter changes. (This occurs when after a rebalance operation, the leader node has no MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is stopped on the leader, meaning it cannot detect config changes by itself. Followers still running the connector can detect config changes, but they cannot query the leader for config updates.) > Full support for distributed mode in dedicated MirrorMaker 2.0 clusters > --- > > Key: KAFKA-10586 > URL: https://issues.apache.org/jira/browse/KAFKA-10586 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated > MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means > that with specific workloads, the dedicated MM2 cluster can become unable to > react to dynamic topic and group filter changes. > (This occurs when after a rebalance operation, the leader node has no > MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is > stopped on the leader, meaning it cannot detect config changes by itself. > Followers still running the connector can detect config changes, but they > cannot query the leader for config updates.) > Besides the REST support, config provider references should be evaluated > lazily in connector configurations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
[ https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-10586: - Summary: Full support for distributed mode in dedicated MirrorMaker 2.0 clusters (was: MirrorMaker 2.0 REST support) > Full support for distributed mode in dedicated MirrorMaker 2.0 clusters > --- > > Key: KAFKA-10586 > URL: https://issues.apache.org/jira/browse/KAFKA-10586 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated > MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means > that with specific workloads, the dedicated MM2 cluster can become unable to > react to dynamic topic and group filter changes. > (This occurs when after a rebalance operation, the leader node has no > MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is > stopped on the leader, meaning it cannot detect config changes by itself. > Followers still running the connector can detect config changes, but they > cannot query the leader for config updates.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10586) MirrorMaker 2.0 REST support
Daniel Urban created KAFKA-10586: Summary: MirrorMaker 2.0 REST support Key: KAFKA-10586 URL: https://issues.apache.org/jira/browse/KAFKA-10586 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Assignee: Daniel Urban KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that with specific workloads, the dedicated MM2 cluster can become unable to react to dynamic topic and group filter changes. (This occurs when after a rebalance operation, the leader node has no MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is stopped on the leader, meaning it cannot detect config changes by itself. Followers still running the connector can detect config changes, but they cannot query the leader for config updates.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
[ https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-10414. -- Resolution: Not A Problem api-util is only a test dependency, not an issue. > Upgrade api-util dependency - CVE-2018-1337 > --- > > Key: KAFKA-10414 > URL: https://issues.apache.org/jira/browse/KAFKA-10414 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > There is a dependency on org.apache.directory.api:api-util:1.0.0, which is > involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= > This is a transitive dependency through the apacheds libs. > -Can be fixed by upgrading to at least version 2.0.0.AM25- > Since api-all is also a dependency, and there is a class collision between > api-all and newer version of api-util, it is better to just upgrade api-util > to 1.0.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
[ https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-10414: - Description: There is a dependency on org.apache.directory.api:api-util:1.0.0, which is involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= This is a transitive dependency through the apacheds libs. -Can be fixed by upgrading to at least version 2.0.0.AM25- Since api-all is also a dependency, and there is a class collision between api-all and newer version of api-util, it is better to just upgrade api-util to 1.0.2 was: There is a dependency on org.apache.directory.api:api-util:1.0.0, which is involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= This is a transitive dependency through the apacheds libs. Can be fixed by upgrading to at least version 2.0.0.AM25 > Upgrade api-util dependency - CVE-2018-1337 > --- > > Key: KAFKA-10414 > URL: https://issues.apache.org/jira/browse/KAFKA-10414 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > There is a dependency on org.apache.directory.api:api-util:1.0.0, which is > involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= > This is a transitive dependency through the apacheds libs. > -Can be fixed by upgrading to at least version 2.0.0.AM25- > Since api-all is also a dependency, and there is a class collision between > api-all and newer version of api-util, it is better to just upgrade api-util > to 1.0.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
[ https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-10414: Assignee: Daniel Urban > Upgrade api-util dependency - CVE-2018-1337 > --- > > Key: KAFKA-10414 > URL: https://issues.apache.org/jira/browse/KAFKA-10414 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > There is a dependency on org.apache.directory.api:api-util:1.0.0, which is > involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= > This is a transitive dependency through the apacheds libs. Can be fixed by > upgrading to at least version 2.0.0.AM25 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
Daniel Urban created KAFKA-10414: Summary: Upgrade api-util dependency - CVE-2018-1337 Key: KAFKA-10414 URL: https://issues.apache.org/jira/browse/KAFKA-10414 Project: Kafka Issue Type: Bug Reporter: Daniel Urban There is a dependency on org.apache.directory.api:api-util:1.0.0, which is involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= This is a transitive dependency through the apacheds libs. Can be fixed by upgrading to at least version 2.0.0.AM25 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-5235) GetOffsetShell: retrieve offsets for multiple topics and partitions
[ https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-5235: Summary: GetOffsetShell: retrieve offsets for multiple topics and partitions (was: GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker) > GetOffsetShell: retrieve offsets for multiple topics and partitions > --- > > Key: KAFKA-5235 > URL: https://issues.apache.org/jira/browse/KAFKA-5235 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Arseniy Tashoyan >Assignee: Daniel Urban >Priority: Major > Labels: kip, tool > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, GetOffsetShell only allows fetching the offsets of a single topic > with an optional list of which partitions to describe. Besides that, it does > not allow consumer properties to be overridden. The tool does not have a > dedicated script under bin either. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-5235) GetOffsetShell: support for multiple topics and consumer configuration override
[ https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-5235: Summary: GetOffsetShell: support for multiple topics and consumer configuration override (was: GetOffsetShell: retrieve offsets for multiple topics and partitions) > GetOffsetShell: support for multiple topics and consumer configuration > override > --- > > Key: KAFKA-5235 > URL: https://issues.apache.org/jira/browse/KAFKA-5235 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Arseniy Tashoyan >Assignee: Daniel Urban >Priority: Major > Labels: kip, tool > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, GetOffsetShell only allows fetching the offsets of a single topic > with an optional list of which partitions to describe. Besides that, it does > not allow consumer properties to be overridden. The tool does not have a > dedicated script under bin either. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker
[ https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-5235: Description: Currently, GetOffsetShell only allows fetching the offsets of a single topic with an optional list of which partitions to describe. Besides that, it does not allow consumer properties to be overridden. The tool does not have a dedicated script under bin either. (was: GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to retrieve metadata about topics and partitions. At present, GetOffsetShell does the following: - get metadata from Zookeeper - iterate over partitions - for each partition, connect to its leader broker and request offsets Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One request is sufficient for all topics and partitions. As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it will not depend on obsolete API: SimpleConsumer, old producer API.) > GetOffsetShell: retrieve offsets for all given topics and partitions with > single request to the broker > -- > > Key: KAFKA-5235 > URL: https://issues.apache.org/jira/browse/KAFKA-5235 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Arseniy Tashoyan >Assignee: Daniel Urban >Priority: Major > Labels: kip, tool > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, GetOffsetShell only allows fetching the offsets of a single topic > with an optional list of which partitions to describe. Besides that, it does > not allow consumer properties to be overridden. The tool does not have a > dedicated script under bin either. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker
[ https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-5235: --- Assignee: Daniel Urban > GetOffsetShell: retrieve offsets for all given topics and partitions with > single request to the broker > -- > > Key: KAFKA-5235 > URL: https://issues.apache.org/jira/browse/KAFKA-5235 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Arseniy Tashoyan >Assignee: Daniel Urban >Priority: Major > Labels: kip, tool > Original Estimate: 168h > Remaining Estimate: 168h > > GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to > retrieve metadata about topics and partitions. At present, GetOffsetShell > does the following: > - get metadata from Zookeeper > - iterate over partitions > - for each partition, connect to its leader broker and request offsets > Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by > means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One > request is sufficient for all topics and partitions. > As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it > will not depend on obsolete API: SimpleConsumer, old producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)