[GitHub] [kafka] tvainika commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection
tvainika commented on a change in pull request #10277: URL: https://github.com/apache/kafka/pull/10277#discussion_r687419372 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java ## @@ -489,7 +489,17 @@ boolean isCycle(String topic) { String source = replicationPolicy.topicSource(topic); if (source == null) { return false; -} else if (source.equals(sourceAndTarget.target())) { +} + +// Fix for https://issues.apache.org/jira/browse/KAFKA-9914 +final boolean condition; +if (replicationPolicy instanceof IdentityReplicationPolicy) { Review comment: So clearly the `IdentityReplicationPolicy` cannot detect cycles, and this is documented https://github.com/apache/kafka/blob/2edaf9ddd30a6417c34f5bce6894a5340632ff1e/docs/upgrade.html#L112 So, would it make sense to move this `isCycle()` method to `ReplicationPolicy` interface (changing that interface, which is marked `evolving`), and have separate implementation for `isCycle` per replication policy, or which kind of approach do you see fitting best here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil opened a new pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool
aloknnikhil opened a new pull request #11204: URL: https://github.com/apache/kafka/pull/11204 **Forward porting** - https://github.com/linkedin/kafka/pull/186 **TICKET** = [KAFKA-13188](https://issues.apache.org/jira/browse/KAFKA-13188) **LI_DESCRIPTION** = The current KafkaConsumer code includes support for allocating buffers from MemoryPool when allocating bytes for requests to Brokers. However, the code doesn't release them back to the pool and hence, rendering pooling moot. Currently, it works because it uses MemoryPool.NONE which just mallocs buffer every time a buffer is requested. However, this won't work if a different memory pool implementation is used. The consumer can't just release the pool back into the memory because the fetch requests keep on holding on to the references to the pool in CompletedFetch objects which live across multiple poll calls. The idea here is to use ref counting based approach, where the ClientResponse increments ref count every time a CompletedFetch object is created and decrement when the fetch is drained after a poll calls returning records. For the rest of the things such as metadata, offset commit, list offsets it is somewhat easier as the client is done with the response bytes after response future callback is completed. This PR also adds the ClientResponseWithFinalize class for debugging purposes as well protected by linkedin.enable.client.resonse.leakcheck flag. The class uses finalizer to check whether there is some issue in code due to which the buffer wasn't released back to the pool yet. However, during in an actual production setting, the finalizers are costly and hence, not enabled by default. **EXIT_CRITERIA** = If and when the upstream ticket is merged, and the changes are pulled in ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13188) Release the memory back into MemoryPool
[ https://issues.apache.org/jira/browse/KAFKA-13188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alok Nikhil reassigned KAFKA-13188: --- Assignee: Alok Nikhil > Release the memory back into MemoryPool > --- > > Key: KAFKA-13188 > URL: https://issues.apache.org/jira/browse/KAFKA-13188 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Alok Nikhil >Priority: Major > > Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to > the linkedin/kafka repo hosting apache kafka 2.4. > The change is about releasing memory back to the MemoryPool for the kafka > consumer, and his benchmark showed significant improvement in terms of the > memory graduating from Young Gen and promoted to Old Gen. > Given the benefit, the change should also be added trunk. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-13165. Resolution: Fixed > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Ryan Dielhenn >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] socutes commented on pull request #11203: MINOR: Fixed "snasphot" naming issue
socutes commented on pull request #11203: URL: https://github.com/apache/kafka/pull/11203#issuecomment-897310958 @showuon @niket-goel please help review this pr. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes opened a new pull request #11203: MINOR: Fixed "snasphot" naming issue
socutes opened a new pull request #11203: URL: https://github.com/apache/kafka/pull/11203 Fixed "snasphot" naming issue. Instead of "snapshot". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter
dengziming commented on pull request #11138: URL: https://github.com/apache/kafka/pull/11138#issuecomment-897275322 Currently we add a `FileSnapshotReader` class and copy some original code to it, a better way is to rename `SnapshotReader` to `FileSnapshotReader` and add a `SnapshotReader` Interface which can reduce code line changes tremendously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes closed pull request #11202: MINOR: Fixed "snasphot" naming issue
socutes closed pull request #11202: URL: https://github.com/apache/kafka/pull/11202 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes opened a new pull request #11202: MINOR: Fixed "snasphot" naming issue
socutes opened a new pull request #11202: URL: https://github.com/apache/kafka/pull/11202 修复”snasphot”命名问题. 改为”snapshot”. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10900) Add metrics enumerated in KIP-630
[ https://issues.apache.org/jira/browse/KAFKA-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu updated KAFKA-10900: --- Description: KIP-630 enumerates a few metrics. Makes sure that those metrics are implemented. |kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs|A histogram of the amount of time it took to generate a snapshot.| |kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs|A histogram of the amount of time it took to load the snapshot.| |kafka.controller:type=KafkaController,name=SnapshotLag|The number of offsets between the largest snapshot offset and the high-watermark.| |kafka.controller:type=KafkaController,name=SnapshotSizeBytes|Size of the latest snapshot in bytes.| was:KIP-630 enumerates a few metrics. Makes sure that those metrics are implemented. > Add metrics enumerated in KIP-630 > - > > Key: KAFKA-10900 > URL: https://issues.apache.org/jira/browse/KAFKA-10900 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > > KIP-630 enumerates a few metrics. Makes sure that those metrics are > implemented. > > |kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs|A histogram > of the amount of time it took to generate a snapshot.| > |kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs|A histogram > of the amount of time it took to load the snapshot.| > |kafka.controller:type=KafkaController,name=SnapshotLag|The number of offsets > between the largest snapshot offset and the high-watermark.| > |kafka.controller:type=KafkaController,name=SnapshotSizeBytes|Size of the > latest snapshot in bytes.| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
niket-goel commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687262200 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; Review comment: Will make the change. Had read the session timeout parameter wrong. Will keep consistent time units. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
niket-goel commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687262311 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); +CreateTopicsResponseData createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), +createTopicsResponseData.topics().find("foo").errorCode()); +assertEquals("Unable to replicate the partition " + replicationFactor + " time(s): All brokers " + +"are currently fenced.", createTopicsResponseData.topics().find("foo").errorMessage()); + +// Unfence all brokers +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("foo").errorCode()); +Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +Thread.sleep(sleepMillis); + +// All brokers should still be unfenced +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + assertTrue(active.replicationControl().isBrokerUnfenced(brokerId), +"Broker " + brokerId + " should have been unfenced"); +} +createTopicsRequestData = new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("bar").setNumPartitions(1). +setConfigs(new CreateableTopicConfigCollection(Collections. +singleton(new CreateableTopicConfig().setName("min.insync.replicas"). +setValue("2")).iterator())). + setReplicationFactor(replicationFactor)).iterator())); +createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); +assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("bar").errorCode()); +Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); + +// Fence some of the brokers +TestUtils.waitForCondition(() -> { +sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs); +for (int i = brokersToKeepUnfenced; i < broke
[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
niket-goel commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687262147 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); +CreateTopicsResponseData createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), +createTopicsResponseData.topics().find("foo").errorCode()); +assertEquals("Unable to replicate the partition " + replicationFactor + " time(s): All brokers " + +"are currently fenced.", createTopicsResponseData.topics().find("foo").errorMessage()); + +// Unfence all brokers +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("foo").errorCode()); +Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +Thread.sleep(sleepMillis); + +// All brokers should still be unfenced +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + assertTrue(active.replicationControl().isBrokerUnfenced(brokerId), +"Broker " + brokerId + " should have been unfenced"); +} +createTopicsRequestData = new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("bar").setNumPartitions(1). +setConfigs(new CreateableTopicConfigCollection(Collections. +singleton(new CreateableTopicConfig().setName("min.insync.replicas"). Review comment: TL;DR Is vestigial, will remove. The reason i added it in there was because even with only one broker available the topic creation would succeed. This added condition of a min.isr was to strengthen the topic creation criteria to make it fail when enough brokers were not un-fenced. I learnt that today the behavior is to not block topic creation even if the min.isr would not be met for the topic on a subsequent produce. Anyway, unless that behavior is changed the min.isr is serving no purpose and I will remove it. I added explicit checks to ensure broker liveness. ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@
[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
niket-goel commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687262045 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); +CreateTopicsResponseData createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), Review comment: The topic creation actually fails here. We try the create again after unfencing the brokers. ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + set
[GitHub] [kafka] xvrl opened a new pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter
xvrl opened a new pull request #11201: URL: https://github.com/apache/kafka/pull/11201 Metric tag maps do not offer any ordering guarantees. We should always sort tags when generating mbean names, or we could end up with duplicate means with different tag order or orphaned mbeans that were not properly removed when metrics are updated / deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
hachikuji commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687194150 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); +CreateTopicsResponseData createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), +createTopicsResponseData.topics().find("foo").errorCode()); +assertEquals("Unable to replicate the partition " + replicationFactor + " time(s): All brokers " + +"are currently fenced.", createTopicsResponseData.topics().find("foo").errorMessage()); + +// Unfence all brokers +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("foo").errorCode()); +Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +Thread.sleep(sleepMillis); + +// All brokers should still be unfenced +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + assertTrue(active.replicationControl().isBrokerUnfenced(brokerId), +"Broker " + brokerId + " should have been unfenced"); +} +createTopicsRequestData = new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("bar").setNumPartitions(1). +setConfigs(new CreateableTopicConfigCollection(Collections. +singleton(new CreateableTopicConfig().setName("min.insync.replicas"). +setValue("2")).iterator())). + setReplicationFactor(replicationFactor)).iterator())); +createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); +assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("bar").errorCode()); +Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); + +// Fence some of the brokers +TestUtils.waitForCondition(() -> { +sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs); +for (int i = brokersToKeepUnfenced; i < broker
[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
rondagostino commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r687199159 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1420,6 +1420,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO override def get(key: String): AnyRef = if (this eq currentConfig) super.get(key) else currentConfig.get(key) + override def postProcessParsedConfig(props: java.util.Map[String,Object]): java.util.Map[String,Object] = { Review comment: Good point! I'll move it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
hachikuji commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687186647 ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); +CreateTopicsResponseData createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), Review comment: This part is a little mysterious to me. We return an error, but the topic is created anyway? That seems surprising. ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). +
[GitHub] [kafka] jsancio commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
jsancio commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r687192965 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1420,6 +1420,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO override def get(key: String): AnyRef = if (this eq currentConfig) super.get(key) else currentConfig.get(key) + override def postProcessParsedConfig(props: java.util.Map[String,Object]): java.util.Map[String,Object] = { Review comment: Why are we doing the validation in this method since it always returns an empty map? Can we do the same validation in `validateValues`? `KafkaConfig` performs the more complicated validation in that method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
rondagostino opened a new pull request #11200: URL: https://github.com/apache/kafka/pull/11200 If both `broker.id` and `node.id` are set, and they are set inconsistently (e.g.`broker.id=0`, `node.id=1`) then currently the value of `node.id` is used and the `broker.id` value is left at the original value. The server should detect this inconsistency, throw a ConfigException, and fail to start. This patch adds the check and a test for it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet opened a new pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None
lbradstreet opened a new pull request #11199: URL: https://github.com/apache/kafka/pull/11199 When the high watermark is contained in a non-active segment, we are not correctly bounding it by the hwm. This means that uncommitted records may overwrite committed data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13194) LogCleaner may clean past highwatermark
[ https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397601#comment-17397601 ] Justine Olshan commented on KAFKA-13194: There was code to change this in [https://github.com/apache/kafka/pull/9590,] but if we want to fix faster, we should just make a new PR. Replacing // we do not clean beyond the first unstable offset log.firstUnstableOffset, with // we do not clean beyond the lastStableOffset (and therefore the high watermark) Option(log.lastStableOffset), > LogCleaner may clean past highwatermark > --- > > Key: KAFKA-13194 > URL: https://issues.apache.org/jira/browse/KAFKA-13194 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Bradstreet >Priority: Minor > > Here we have the cleaning point being bounded to the active segment base > offset and the first unstable offset. Which makes sense: > > {code:java} >// find first segment that cannot be cleaned > // neither the active segment, nor segments with any messages closer to > the head of the log than the minimum compaction lag time > // may be cleaned > val firstUncleanableDirtyOffset: Long = Seq( // we do not clean > beyond the first unstable offset > log.firstUnstableOffset, // the active segment is always > uncleanable > Option(log.activeSegment.baseOffset), // the first segment whose > largest message timestamp is within a minimum time lag from now > if (minCompactionLagMs > 0) { > // dirty log segments > val dirtyNonActiveSegments = > log.localNonActiveLogSegmentsFrom(firstDirtyOffset) > dirtyNonActiveSegments.find { s => > val isUncleanable = s.largestTimestamp > now - minCompactionLagMs > debug(s"Checking if log segment may be cleaned: log='${log.name}' > segment.baseOffset=${s.baseOffset} " + > s"segment.largestTimestamp=${s.largestTimestamp}; now - > compactionLag=${now - minCompactionLagMs}; " + > s"is uncleanable=$isUncleanable") > isUncleanable > }.map(_.baseOffset) > } else None > ).flatten.min > {code} > > But LSO starts out as None. > {code:java} > @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] > = None > private[log] def firstUnstableOffset: Option[Long] = > firstUnstableOffsetMetadata.map(_.messageOffset){code} > For most code depending on the LSO, fetchLastStableOffsetMetadata is used to > default it to the hwm if it's not set. > > {code:java} > private def fetchLastStableOffsetMetadata: LogOffsetMetadata = { > checkIfMemoryMappedBufferClosed()// cache the current high watermark > to avoid a concurrent update invalidating the range check > val highWatermarkMetadata = fetchHighWatermarkMetadata > firstUnstableOffsetMetadata match { > case Some(offsetMetadata) if offsetMetadata.messageOffset < > highWatermarkMetadata.messageOffset => > if (offsetMetadata.messageOffsetOnly) { > lock synchronized { > val fullOffset = > convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) > if (firstUnstableOffsetMetadata.contains(offsetMetadata)) > firstUnstableOffsetMetadata = Some(fullOffset) > fullOffset > } > } else { > offsetMetadata > } > case _ => highWatermarkMetadata > } > } > {code} > > > This means that in the case where the hwm is prior to the active segment > base, the log cleaner may clean past the hwm. This is most likely to occur > after a broker restart when the log cleaner may start cleaning prior to > replication becoming active. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13194) LogCleaner may clean past highwatermark
[ https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Bradstreet updated KAFKA-13194: - Description: Here we have the cleaning point being bounded to the active segment base offset and the first unstable offset. Which makes sense: {code:java} // find first segment that cannot be cleaned // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( // we do not clean beyond the first unstable offset log.firstUnstableOffset, // the active segment is always uncleanable Option(log.activeSegment.baseOffset), // the first segment whose largest message timestamp is within a minimum time lag from now if (minCompactionLagMs > 0) { // dirty log segments val dirtyNonActiveSegments = log.localNonActiveLogSegmentsFrom(firstDirtyOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - minCompactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " + s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " + s"is uncleanable=$isUncleanable") isUncleanable }.map(_.baseOffset) } else None ).flatten.min {code} But LSO starts out as None. {code:java} @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset){code} For most code depending on the LSO, fetchLastStableOffsetMetadata is used to default it to the hwm if it's not set. {code:java} private def fetchLastStableOffsetMetadata: LogOffsetMetadata = { checkIfMemoryMappedBufferClosed()// cache the current high watermark to avoid a concurrent update invalidating the range check val highWatermarkMetadata = fetchHighWatermarkMetadata firstUnstableOffsetMetadata match { case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset => if (offsetMetadata.messageOffsetOnly) { lock synchronized { val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) if (firstUnstableOffsetMetadata.contains(offsetMetadata)) firstUnstableOffsetMetadata = Some(fullOffset) fullOffset } } else { offsetMetadata } case _ => highWatermarkMetadata } } {code} This means that in the case where the hwm is prior to the active segment base, the log cleaner may clean past the hwm. This is most likely to occur after a broker restart when the log cleaner may start cleaning prior to replication becoming active. was: Here we have the cleaning point being bounded to the active segment base offset and the first unstable offset. Which makes sense: // find first segment that cannot be cleaned // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( // we do not clean beyond the first unstable offset log.firstUnstableOffset, // the active segment is always uncleanable Option(log.activeSegment.baseOffset), // the first segment whose largest message timestamp is within a minimum time lag from now if (minCompactionLagMs > 0) \{ // dirty log segments val dirtyNonActiveSegments = log.localNonActiveLogSegmentsFrom(firstDirtyOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - minCompactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " + s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " + s"is uncleanable=$isUncleanable") isUncleanable }.map(_.baseOffset) } else None ).flatten.min But LSO starts out as None. @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset) For most code depending on the LSO, fetchLastStableOffsetMetadata is used to default it to the hwm if it's not set. private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{ checkIfMemoryMappedBufferClosed() // cache the current high watermark to avoid a concurrent update invalidating the range check val highWatermarkMetadata = fetchHighWatermarkMetadata firstUnstableOffsetMetadata match {
[jira] [Updated] (KAFKA-13194) LogCleaner may clean past highwatermark
[ https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Bradstreet updated KAFKA-13194: - Priority: Minor (was: Major) > LogCleaner may clean past highwatermark > --- > > Key: KAFKA-13194 > URL: https://issues.apache.org/jira/browse/KAFKA-13194 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Bradstreet >Priority: Minor > > Here we have the cleaning point being bounded to the active segment base > offset and the first unstable offset. Which makes sense: > > {code:java} >// find first segment that cannot be cleaned > // neither the active segment, nor segments with any messages closer to > the head of the log than the minimum compaction lag time > // may be cleaned > val firstUncleanableDirtyOffset: Long = Seq( // we do not clean > beyond the first unstable offset > log.firstUnstableOffset, // the active segment is always > uncleanable > Option(log.activeSegment.baseOffset), // the first segment whose > largest message timestamp is within a minimum time lag from now > if (minCompactionLagMs > 0) { > // dirty log segments > val dirtyNonActiveSegments = > log.localNonActiveLogSegmentsFrom(firstDirtyOffset) > dirtyNonActiveSegments.find { s => > val isUncleanable = s.largestTimestamp > now - minCompactionLagMs > debug(s"Checking if log segment may be cleaned: log='${log.name}' > segment.baseOffset=${s.baseOffset} " + > s"segment.largestTimestamp=${s.largestTimestamp}; now - > compactionLag=${now - minCompactionLagMs}; " + > s"is uncleanable=$isUncleanable") > isUncleanable > }.map(_.baseOffset) > } else None > ).flatten.min > {code} > > But LSO starts out as None. > {code:java} > @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] > = None > private[log] def firstUnstableOffset: Option[Long] = > firstUnstableOffsetMetadata.map(_.messageOffset){code} > For most code depending on the LSO, fetchLastStableOffsetMetadata is used to > default it to the hwm if it's not set. > > {code:java} > private def fetchLastStableOffsetMetadata: LogOffsetMetadata = { > checkIfMemoryMappedBufferClosed()// cache the current high watermark > to avoid a concurrent update invalidating the range check > val highWatermarkMetadata = fetchHighWatermarkMetadata > firstUnstableOffsetMetadata match { > case Some(offsetMetadata) if offsetMetadata.messageOffset < > highWatermarkMetadata.messageOffset => > if (offsetMetadata.messageOffsetOnly) { > lock synchronized { > val fullOffset = > convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) > if (firstUnstableOffsetMetadata.contains(offsetMetadata)) > firstUnstableOffsetMetadata = Some(fullOffset) > fullOffset > } > } else { > offsetMetadata > } > case _ => highWatermarkMetadata > } > } > {code} > > > This means that in the case where the hwm is prior to the active segment > base, the log cleaner may clean past the hwm. This is most likely to occur > after a broker restart when the log cleaner may start cleaning prior to > replication becoming active. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13192) broker.id and node.id can be specified inconsistently
[ https://issues.apache.org/jira/browse/KAFKA-13192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino reassigned KAFKA-13192: - Assignee: Ron Dagostino > broker.id and node.id can be specified inconsistently > - > > Key: KAFKA-13192 > URL: https://issues.apache.org/jira/browse/KAFKA-13192 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > > If both broker.id and node.id are set, and they are set inconsistently > (e.g.broker.id=0, node.id=1) then the value of node.id is used and the > broker.id value is left at the original value. The server should detect this > inconsistency, throw a ConfigException, and fail to start. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13194) LogCleaner may clean past highwatermark
Lucas Bradstreet created KAFKA-13194: Summary: LogCleaner may clean past highwatermark Key: KAFKA-13194 URL: https://issues.apache.org/jira/browse/KAFKA-13194 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet Here we have the cleaning point being bounded to the active segment base offset and the first unstable offset. Which makes sense: // find first segment that cannot be cleaned // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( // we do not clean beyond the first unstable offset log.firstUnstableOffset, // the active segment is always uncleanable Option(log.activeSegment.baseOffset), // the first segment whose largest message timestamp is within a minimum time lag from now if (minCompactionLagMs > 0) \{ // dirty log segments val dirtyNonActiveSegments = log.localNonActiveLogSegmentsFrom(firstDirtyOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - minCompactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " + s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " + s"is uncleanable=$isUncleanable") isUncleanable }.map(_.baseOffset) } else None ).flatten.min But LSO starts out as None. @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset) For most code depending on the LSO, fetchLastStableOffsetMetadata is used to default it to the hwm if it's not set. private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{ checkIfMemoryMappedBufferClosed() // cache the current high watermark to avoid a concurrent update invalidating the range check val highWatermarkMetadata = fetchHighWatermarkMetadata firstUnstableOffsetMetadata match { case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset => if (offsetMetadata.messageOffsetOnly) { lock synchronized { val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) if (firstUnstableOffsetMetadata.contains(offsetMetadata)) firstUnstableOffsetMetadata = Some(fullOffset) fullOffset } } else \{ offsetMetadata } case _ => highWatermarkMetadata } } This means that in the case where the hwm is prior to the active segment base, the log cleaner may clean past the hwm. This is most likely to occur after a broker restart when the log cleaner may start cleaning prior to replication becoming active. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13193) Replica manager doesn't update partition state when transitioning from leader to follower with unknown leader
Jose Armando Garcia Sancio created KAFKA-13193: -- Summary: Replica manager doesn't update partition state when transitioning from leader to follower with unknown leader Key: KAFKA-13193 URL: https://issues.apache.org/jira/browse/KAFKA-13193 Project: Kafka Issue Type: Bug Components: kraft, replication Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio This issue applies to both the ZK and KRaft implementation of the replica manager. In the rare case when a replica transition from leader to follower with no leader the partition state is not updated. This is because when handling makeFollowers the ReplicaManager only updates the partition state if the leader is alive. The solution is to always transition to follower but not start the fetcher thread if the leader is unknown or not alive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12954) Add Support for Scala 3 in 4.0.0
[ https://issues.apache.org/jira/browse/KAFKA-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397503#comment-17397503 ] Josep Prat commented on KAFKA-12954: There is a PR for supporting Scala 3 in Gradle: https://github.com/gradle/gradle/pull/18001 > Add Support for Scala 3 in 4.0.0 > > > Key: KAFKA-12954 > URL: https://issues.apache.org/jira/browse/KAFKA-12954 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Josep Prat >Assignee: Josep Prat >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > This is a follow up task from > https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 > support will be dropped. > It would be good to, at the same time, add support for Scala 3. > Initially it would be enough to only make the code compile with Scala 3 so we > can generate the proper Scala 3 artifacts, this might be achieved with the > proper compiler flags and an occasional rewrite. > Follow up tasks could be created to migrate to a more idiomatic Scala 3 > writing if desired. > If I understand it correctly, this would need a KIP as we are modifying the > public interfaces (new artifacts). If this is the case, let me know and I > will write it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13192) broker.id and node.id can be specified inconsistently
Ron Dagostino created KAFKA-13192: - Summary: broker.id and node.id can be specified inconsistently Key: KAFKA-13192 URL: https://issues.apache.org/jira/browse/KAFKA-13192 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: Ron Dagostino If both broker.id and node.id are set, and they are set inconsistently (e.g.broker.id=0, node.id=1) then the value of node.id is used and the broker.id value is left at the original value. The server should detect this inconsistency, throw a ConfigException, and fail to start. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rodesai commented on pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on pull request #11149: URL: https://github.com/apache/kafka/pull/11149#issuecomment-896970248 @cadonna @ableegoldman @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0 (repeatedly spamming the log) : {noformat} [2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), retrying. (kafka.server.DefaultAlterIsrManager){noformat} Kafka-1 seems ok When kafka-2 starts, it has this log entry with regards to its own broker epoch: {noformat} [2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with addresses: INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094, czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat} This is despite kafka-2 appearing to start fine, this is what you see in kafka-2's logs, nothing else seems to be added to the log, it just seems to hang here: {noformat} [2021-08-11 11:44:48,911] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Started socket server acceptors and processors (kafka.network.SocketServer) [2021-08-11 11:44:48,913] INFO Kafka version: 6.2.0-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-08-11 11:44:48,913] INFO Kafka commitId: 1a5755cf9401c84f (org.apache.kafka.common.utils.AppInfoParser) [2021-08-11 11:44:48,913] INFO Kafka startTimeMs: 1628682288911 (org.apache.kafka.common.utils.AppInfoParser) [2021-08-11 11:44:48,914] INFO [KafkaServer id=2] started (kafka.server.KafkaServer) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. Just to be clear, rolling restarting only kafka or zookeeper is absolutely fine. After some additional testing, it appears this can be recreated simply by restarting a broker pod and then restarting the zookeeper leader as the broker is shutting down. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset fir
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0 (repeatedly spamming the log) : {noformat} [2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), retrying. (kafka.server.DefaultAlterIsrManager){noformat} Kafka-1 seems ok When kafka-2 starts, it has this log entry with regards to its own broker epoch: {noformat} [2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with addresses: INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094, czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat} This is despite kafka-2 appearing to start fine, this is what you see in kafka-2's logs, nothing else seems to be added to the log, it just seems to hang here: {noformat} [2021-08-11 11:44:48,911] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Started socket server acceptors and processors (kafka.network.SocketServer) [2021-08-11 11:44:48,913] INFO Kafka version: 6.2.0-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-08-11 11:44:48,913] INFO Kafka commitId: 1a5755cf9401c84f (org.apache.kafka.common.utils.AppInfoParser) [2021-08-11 11:44:48,913] INFO Kafka startTimeMs: 1628682288911 (org.apache.kafka.common.utils.AppInfoParser) [2021-08-11 11:44:48,914] INFO [KafkaServer id=2] started (kafka.server.KafkaServer) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. Just to be clear, rolling restarting only kafka or zookeeper is absolutely fine. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if an
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0 (repeatedly spamming the log) : {noformat} [2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), retrying. (kafka.server.DefaultAlterIsrManager){noformat} Kafka-1 seems ok When kafka-2 starts, it has this log entry with regards to its own broker epoch: {noformat} [2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with addresses: INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094, czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. Just to be clear, rolling restarting only kafka or zookeeper is absolutely fine. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0: {noformat} [2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0: {noformat} [2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), retrying. (kafka.server.DefaultAlterIsrManager){noformat} Kafka-1 seems ok When kafka-2 starts, it has this log entry with regards to its own broker epoch: {noformat} [2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with addresses: INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094, czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. Just to be clear, rolling restarting only kafka or zookeeper is absolutely fine. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlter
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. Just to be clear, rolling restarting only kafka or zookeeper is absolutely fine. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brok
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. Just to be clear, rolling restarting only kafka was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /bro
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. I appreciate that Zookeeper and Kafka would not normally be restarted concurrently in this way. However there are going to be scenarios where this can happen, such as if we had simultaneous Kubernetes node failures, resulting in the loss of both a zookeeper and a kafka pod at the same time. This could result in the issue above. This is not something that we have seen previously with versions 1.1 or 2.5. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, e
[GitHub] [kafka] chia7712 commented on a change in pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…
chia7712 commented on a change in pull request #10881: URL: https://github.com/apache/kafka/pull/10881#discussion_r686709179 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java ## @@ -42,77 +39,65 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doThrow; -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) public class StateManagerUtilTest { -@Mock(type = MockType.NICE) +@Mock private ProcessorStateManager stateManager; -@Mock(type = MockType.NICE) +@Mock private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private ProcessorTopology topology; -@Mock(type = MockType.NICE) +@Mock private InternalProcessorContext processorContext; -private IMocksControl ctrl; private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); @Before public void setup() { -ctrl = createStrictControl(); -topology = ctrl.createMock(ProcessorTopology.class); -processorContext = ctrl.createMock(InternalProcessorContext.class); +topology = mock(ProcessorTopology.class); +processorContext = mock(InternalProcessorContext.class); -stateManager = ctrl.createMock(ProcessorStateManager.class); -stateDirectory = ctrl.createMock(StateDirectory.class); +stateManager = mock(ProcessorStateManager.class); +stateDirectory = mock(StateDirectory.class); } @Test public void testRegisterStateStoreWhenTopologyEmpty() { -expect(topology.stateStores()).andReturn(emptyList()); - -ctrl.checkOrder(true); -ctrl.replay(); +when(topology.stateStores()).thenReturn(emptyList()); +inOrder(topology); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - -ctrl.verify(); } @Test public void testRegisterStateStoreFailToLockStateDirectory() { -expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); +when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); + +when(stateManager.taskId()).thenReturn(taskId); -expect(stateManager.taskId()).andReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(false); -expect(stateDirectory.lock(taskId)).andReturn(false); - -ctrl.checkOrder(true); -ctrl.replay(); +inOrder(topology, stateManager, stateDirectory); Review comment: you have to check the execution order through the return object from `inOrder` ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java ## @@ -42,77 +39,65 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doThrow; -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) public class StateManagerUtilTest { -@Mock(type = MockType.NICE) +@Mock private ProcessorStateManager stateManager; -@Mock(type = MockType.NICE) +@Mock private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private ProcessorTopology topology; -@Mock(type = MockType.NICE) +@Mock private InternalProcessorContext processorContext; -private IMocksControl ctrl; private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); @Before public void setup() { -ctrl = createStrictControl(); -topology = ctrl.createMock(ProcessorTopology.class); -processor
[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
[ https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CS User updated KAFKA-13191: Description: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgrading, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: [https://github.com/apache/kafka/pull/9100/files#r494480847] Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. was: We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgraded, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: https://github.com/apache/kafka/pull/9100/files#r494480847 Recovery is possible by continuing to restart the remaining brokers in the clu
[jira] [Created] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster
CS User created KAFKA-13191: --- Summary: Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster Key: KAFKA-13191 URL: https://issues.apache.org/jira/browse/KAFKA-13191 Project: Kafka Issue Type: Bug Components: protocol Affects Versions: 2.8.0 Reporter: CS User We're using confluent platform 6.2, running in a Kubernetes environment. The cluster has been running for a couple of years with zero issues, starting from version 1.1, 2.5 and now 2.8. We've very recently upgraded to kafka 2.8 from kafka 2.5. Since upgraded, we have seen issues when kafka and zookeeper pods restart concurrently. We can replicate the issue by restarting either the zookeeper statefulset first or the kafka statefulset first, either way appears to result with the same failure scenario. We've attempted to mitigate by preventing the kafka pods from stopping if any zookeeper pods are being restarted, or a rolling restart of the zookeeper cluster is underway. We've also added a check to stop the kafka pods from starting until all zookeeper pods are ready, however under the following scenario we still see the issue: In a 3 node kafka cluster with 5 zookeeper servers # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds # zookeeper-4 terminates # kafka-2 starts-up, and waits until the zookeeper rollout completes # kafka-2 eventually fully starts, kafka comes up and we see the errors below on other pods in the cluster. Without mitigation and in the above scenario we see errors on pods kafka-0/kafka-1: {noformat} Broker had a stale broker epoch (635655171205), retrying. (kafka.server.DefaultAlterIsrManager) {noformat} This never appears to recover. If you then restart kafka-2, you'll see these errors: {noformat} org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 0. {noformat} This seems to completely break the cluster, partitions do not failover as expected. Checking zookeeper, and getting the values of the brokers look fine {noformat} get /brokers/ids/0{noformat} etc, all looks fine there, each broker is present. This error message appears to have been added to kafka in the last 11 months {noformat} Broker had a stale broker epoch {noformat} Via this PR: [https://github.com/apache/kafka/pull/9100] I see also this comment around the leader getting stuck: https://github.com/apache/kafka/pull/9100/files#r494480847 Recovery is possible by continuing to restart the remaining brokers in the cluster. Once all have been restarted, everything looks fine. Has anyone else come across this? It seems very simple to replicate in our environment, simply start a simultaneous rolling restart of both kafka and zookeeper. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9914) Mirror Maker 2 creates heartbeats kafka topics recursively
[ https://issues.apache.org/jira/browse/KAFKA-9914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397208#comment-17397208 ] Al Ricafort commented on KAFKA-9914: Hi, I have the same issue. I have 2 Kafka setup, A and B. A is a cluster with 3 instances running on the same machine, while B is a stand alone in another machine. I tried to use Mirror Maker to replicate from A to B. The config file is as follows: {{clusters = A, B}} {{A.bootstrap.servers = host1:9091, host1:9092, host1:9093}} {{B.bootstrap.servers = host2:9092}} {{A->B.enabled = true}} {{B->A.enabled = false}} {{replication.factor = 1}} {{checkpoints.topic.replication.factor=1}} {{heartbeats.topic replication.factor=1}} {{offset-syncs.topic.replication.factor=1}} {{offset.storage.replication.factor=1}} {{status.storage.replication.factor=1}} {{config.storage.replication.factor=1}} I execute mirror maker as follows: {{bin/connect-mirror-maker.sh config/mirror.maker.properties}} But when I run the mirror maker on the ‘B’ side I can see that it keeps creating ‘heartbeats’ topics. So at some point I will end up having so many ‘A.heartbeats’ topic like below. And it will just keep growing until I kill the mirror maker. {{A.A.A.A.heartbeats}} {{A.A.A.heartbeats}} {{A.A.heartbeats}} {{A.heartbeats}} How to go around this issue? Thanks. Kafka Version: 2.13-2.8 > Mirror Maker 2 creates heartbeats kafka topics recursively > -- > > Key: KAFKA-9914 > URL: https://issues.apache.org/jira/browse/KAFKA-9914 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.0.0 >Reporter: azher khan >Priority: Major > > Hi Team, > I configured Mirror Maker 2 to run on a Pod configuring the Kafka brokers in > our environments marking '10.X.X.YY' Kafka broker as primary and '10.X.Y.XY' > as backup. > +*Mirror Maker mm2.properties*+ > {code:java} > clusters=kafka,backup > kafka.bootstrap.servers=10.X.X.YY:9092 > backup.bootstrap.servers=10.X.Y.XY:9092 > kafka->backup.enabled=true > kafka->backup.topics=az.* > backup->kafka.enabled=false > sync.topic.acls.enabled=true > {code} > > I was able to run Mirror Maker 2 successfully and was also able to take > backup of the identified topics (starting with 'az'). > > However I could see many kafka topics with the suffix 'heartbeats' created in > recursion (See below the list of topics with 'kafka.*.heartbeats'). This > could be because i triggered the 'connect-mirror-maker.sh' several times to > test backing up other topics > > I have 2 queries > # How to avoid having 'kafka.*.heartbeats' topic being created using Mirror > Maker 2 > # Once Mirror Maker 2 has backed up a topic say 'azherf1test' . What is the > best way to rollback changes made by Mirror Maker (ie delete all the topics > created by Mirror Maker 'kafka.azherf1test' and supporting topics) ensuring > the stability of existing/source topics and Kafka broker. > We are testing Mirror Maker and want to ensure we are able to roll back the > changes without affecting the Kafka topics/brokers) > +*Kafka Topics list output:*+ > {code:java} > azherf1test > heartbeats > kafka-client-topic > mm2-configs.backup.internal > mm2-configs.kafka.internal > mm2-offset-syncs.backup.internal > mm2-offsets.backup.internal > mm2-offsets.kafka.internal > mm2-status.backup.internal > mm2-status.kafka.internal > kafka.azherf1test > kafka.checkpoints.internal > kafka.heartbeats > kafka.kafka.heartbeats > kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats > kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.ka
[GitHub] [kafka] dielhennr removed a comment on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.
dielhennr removed a comment on pull request #11141: URL: https://github.com/apache/kafka/pull/11141#issuecomment-892837165 @hachikuji @rajinisivaram since this build is green I’m fairly confident that dynamic config behavior hasn’t changed in a zk cluster. I am wondering if it is better to have buggy validation than no validation for KRaft brokers (assuming this is still buggy). Do either of you anticipate any problems with doing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org