[GitHub] [kafka] tvainika commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-08-11 Thread GitBox


tvainika commented on a change in pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#discussion_r687419372



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##
@@ -489,7 +489,17 @@ boolean isCycle(String topic) {
 String source = replicationPolicy.topicSource(topic);
 if (source == null) {
 return false;
-} else if (source.equals(sourceAndTarget.target())) {
+}
+
+// Fix for https://issues.apache.org/jira/browse/KAFKA-9914
+final boolean condition;
+if (replicationPolicy instanceof IdentityReplicationPolicy) {

Review comment:
   So clearly the `IdentityReplicationPolicy` cannot detect cycles, and 
this is documented 
https://github.com/apache/kafka/blob/2edaf9ddd30a6417c34f5bce6894a5340632ff1e/docs/upgrade.html#L112
   
   So, would it make sense to move this `isCycle()` method to 
`ReplicationPolicy` interface (changing that interface, which is marked 
`evolving`), and have separate implementation for `isCycle` per replication 
policy, or which kind of approach do you see fitting best here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil opened a new pull request #11204: KAFKA-13188 - Release the memory back into MemoryPool

2021-08-11 Thread GitBox


aloknnikhil opened a new pull request #11204:
URL: https://github.com/apache/kafka/pull/11204


   **Forward porting** - https://github.com/linkedin/kafka/pull/186
   
   **TICKET** = [KAFKA-13188](https://issues.apache.org/jira/browse/KAFKA-13188)
   **LI_DESCRIPTION** =
   The current KafkaConsumer code includes support for allocating buffers from 
MemoryPool when allocating bytes for requests to Brokers. However, the code 
doesn't release them back to the pool and hence, rendering pooling moot. 
Currently, it works because it uses MemoryPool.NONE which just mallocs buffer 
every time a buffer is requested. However, this won't work if a different 
memory pool implementation is used.
   
   The consumer can't just release the pool back into the memory because the 
fetch requests keep on holding on to the references to the pool in 
CompletedFetch objects which live across multiple poll calls. The idea here is 
to use ref counting based approach, where the ClientResponse increments ref 
count every time a CompletedFetch object is created and decrement when the 
fetch is drained after a poll calls returning records.
   
   For the rest of the things such as metadata, offset commit, list offsets it 
is somewhat easier as the client is done with the response bytes after response 
future callback is completed.
   
   This PR also adds the ClientResponseWithFinalize class for debugging 
purposes as well protected by linkedin.enable.client.resonse.leakcheck flag. 
The class uses finalizer to check whether there is some issue in code due to 
which the buffer wasn't released back to the pool yet. However, during in an 
actual production setting, the finalizers are costly and hence, not enabled by 
default.
   
   **EXIT_CRITERIA** = If and when the upstream ticket is merged, and the 
changes are pulled in
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13188) Release the memory back into MemoryPool

2021-08-11 Thread Alok Nikhil (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alok Nikhil reassigned KAFKA-13188:
---

Assignee: Alok Nikhil

> Release the memory back into MemoryPool
> ---
>
> Key: KAFKA-13188
> URL: https://issues.apache.org/jira/browse/KAFKA-13188
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Alok Nikhil
>Priority: Major
>
> Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to 
> the linkedin/kafka repo hosting apache kafka 2.4.
> The change is about releasing memory back to the MemoryPool for the kafka 
> consumer, and his benchmark showed significant improvement in terms of the 
> memory graduating from Young Gen and promoted to Old Gen.
> Given the benefit, the change should also be added trunk.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-11 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-13165.

Resolution: Fixed

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Ryan Dielhenn
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] socutes commented on pull request #11203: MINOR: Fixed "snasphot" naming issue

2021-08-11 Thread GitBox


socutes commented on pull request #11203:
URL: https://github.com/apache/kafka/pull/11203#issuecomment-897310958


   @showuon @niket-goel  please help review this pr. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes opened a new pull request #11203: MINOR: Fixed "snasphot" naming issue

2021-08-11 Thread GitBox


socutes opened a new pull request #11203:
URL: https://github.com/apache/kafka/pull/11203


   Fixed "snasphot" naming issue. Instead of "snapshot".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11138: KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter

2021-08-11 Thread GitBox


dengziming commented on pull request #11138:
URL: https://github.com/apache/kafka/pull/11138#issuecomment-897275322


   Currently we add a `FileSnapshotReader` class and copy some original code to 
it, a better way is to rename `SnapshotReader` to `FileSnapshotReader` and add 
a `SnapshotReader` Interface which can reduce code line changes tremendously.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes closed pull request #11202: MINOR: Fixed "snasphot" naming issue

2021-08-11 Thread GitBox


socutes closed pull request #11202:
URL: https://github.com/apache/kafka/pull/11202


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes opened a new pull request #11202: MINOR: Fixed "snasphot" naming issue

2021-08-11 Thread GitBox


socutes opened a new pull request #11202:
URL: https://github.com/apache/kafka/pull/11202


   修复”snasphot”命名问题. 改为”snapshot”.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10900) Add metrics enumerated in KIP-630

2021-08-11 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu updated KAFKA-10900:
---
Description: 
KIP-630 enumerates a few metrics. Makes sure that those metrics are implemented.

 
|kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs|A histogram of 
the amount of time it took to generate a snapshot.|
|kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs|A histogram 
of the amount of time it took to load the snapshot.|
|kafka.controller:type=KafkaController,name=SnapshotLag|The number of offsets 
between the largest snapshot offset and the high-watermark.|
|kafka.controller:type=KafkaController,name=SnapshotSizeBytes|Size of the 
latest snapshot in bytes.|

  was:KIP-630 enumerates a few metrics. Makes sure that those metrics are 
implemented.


> Add metrics enumerated in KIP-630
> -
>
> Key: KAFKA-10900
> URL: https://issues.apache.org/jira/browse/KAFKA-10900
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>
> KIP-630 enumerates a few metrics. Makes sure that those metrics are 
> implemented.
>  
> |kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs|A histogram 
> of the amount of time it took to generate a snapshot.|
> |kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs|A histogram 
> of the amount of time it took to load the snapshot.|
> |kafka.controller:type=KafkaController,name=SnapshotLag|The number of offsets 
> between the largest snapshot offset and the high-watermark.|
> |kafka.controller:type=KafkaController,name=SnapshotSizeBytes|Size of the 
> latest snapshot in bytes.|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-11 Thread GitBox


niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687262200



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;

Review comment:
   Will make the change. Had read the session timeout parameter wrong. Will 
keep consistent time units.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-11 Thread GitBox


niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687262311



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
setReplicationFactor(replicationFactor)).iterator()));
+CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+createTopicsResponseData.topics().find("foo").errorCode());
+assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+"are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+// Unfence all brokers
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+createTopicsResponseData = active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+Thread.sleep(sleepMillis);
+
+// All brokers should still be unfenced
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+"Broker " + brokerId + " should have been unfenced");
+}
+createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("bar").setNumPartitions(1).
+setConfigs(new 
CreateableTopicConfigCollection(Collections.
+singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").
+setValue("2")).iterator())).
+
setReplicationFactor(replicationFactor)).iterator()));
+createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("bar").errorCode());
+Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+
+// Fence some of the brokers
+TestUtils.waitForCondition(() -> {
+sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+for (int i = brokersToKeepUnfenced; i < broke

[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-11 Thread GitBox


niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687262147



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
setReplicationFactor(replicationFactor)).iterator()));
+CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+createTopicsResponseData.topics().find("foo").errorCode());
+assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+"are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+// Unfence all brokers
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+createTopicsResponseData = active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+Thread.sleep(sleepMillis);
+
+// All brokers should still be unfenced
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+"Broker " + brokerId + " should have been unfenced");
+}
+createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("bar").setNumPartitions(1).
+setConfigs(new 
CreateableTopicConfigCollection(Collections.
+singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").

Review comment:
   TL;DR Is vestigial, will remove.
   
   The reason i added it in there was because even with only one broker 
available the topic creation would succeed. This added condition of a min.isr 
was to strengthen the topic creation criteria to make it fail when enough 
brokers were not un-fenced. I learnt that today the behavior is to not block 
topic creation even if the min.isr would not be met for the topic on a 
subsequent produce. Anyway, unless that behavior is changed the min.isr is 
serving no purpose and I will remove it. I added explicit checks to ensure 
broker liveness.

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ 

[GitHub] [kafka] niket-goel commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-11 Thread GitBox


niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687262045



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
setReplicationFactor(replicationFactor)).iterator()));
+CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),

Review comment:
   The topic creation actually fails here. We try the create again after 
unfencing the brokers.

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
set

[GitHub] [kafka] xvrl opened a new pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter

2021-08-11 Thread GitBox


xvrl opened a new pull request #11201:
URL: https://github.com/apache/kafka/pull/11201


   Metric tag maps do not offer any ordering guarantees. We should
   always sort tags when generating mbean names, or we could end up
   with duplicate means with different tag order or orphaned mbeans
   that were not properly removed when metrics are updated / deleted


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-11 Thread GitBox


hachikuji commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687194150



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
setReplicationFactor(replicationFactor)).iterator()));
+CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+createTopicsResponseData.topics().find("foo").errorCode());
+assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+"are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+// Unfence all brokers
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+createTopicsResponseData = active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+Thread.sleep(sleepMillis);
+
+// All brokers should still be unfenced
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+"Broker " + brokerId + " should have been unfenced");
+}
+createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("bar").setNumPartitions(1).
+setConfigs(new 
CreateableTopicConfigCollection(Collections.
+singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").
+setValue("2")).iterator())).
+
setReplicationFactor(replicationFactor)).iterator()));
+createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("bar").errorCode());
+Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+
+// Fence some of the brokers
+TestUtils.waitForCondition(() -> {
+sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+for (int i = brokersToKeepUnfenced; i < broker

[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-11 Thread GitBox


rondagostino commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r687199159



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1420,6 +1420,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   override def get(key: String): AnyRef =
 if (this eq currentConfig) super.get(key) else currentConfig.get(key)
 
+  override def postProcessParsedConfig(props: java.util.Map[String,Object]): 
java.util.Map[String,Object] = {

Review comment:
   Good point!  I'll move it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-11 Thread GitBox


hachikuji commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687186647



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
setReplicationFactor(replicationFactor)).iterator()));
+CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),

Review comment:
   This part is a little mysterious to me. We return an error, but the 
topic is created anyway? That seems surprising.

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+  

[GitHub] [kafka] jsancio commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-11 Thread GitBox


jsancio commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r687192965



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1420,6 +1420,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   override def get(key: String): AnyRef =
 if (this eq currentConfig) super.get(key) else currentConfig.get(key)
 
+  override def postProcessParsedConfig(props: java.util.Map[String,Object]): 
java.util.Map[String,Object] = {

Review comment:
   Why are we doing the validation in this method since it always returns 
an empty map? Can we do the same validation in `validateValues`? `KafkaConfig` 
performs the more complicated validation in that method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-11 Thread GitBox


rondagostino opened a new pull request #11200:
URL: https://github.com/apache/kafka/pull/11200


   If both `broker.id` and `node.id` are set, and they are set inconsistently 
(e.g.`broker.id=0`, `node.id=1`) then currently the value of `node.id` is used 
and the `broker.id` value is left at the original value. The server should 
detect this inconsistency, throw a ConfigException, and fail to start.  This 
patch adds the check and a test for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet opened a new pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None

2021-08-11 Thread GitBox


lbradstreet opened a new pull request #11199:
URL: https://github.com/apache/kafka/pull/11199


   When the high watermark is contained in a non-active segment, we are not 
correctly bounding it by the hwm. This means that uncommitted records may 
overwrite committed data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13194) LogCleaner may clean past highwatermark

2021-08-11 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397601#comment-17397601
 ] 

Justine Olshan commented on KAFKA-13194:


There was code to change this in [https://github.com/apache/kafka/pull/9590,] 
but if we want to fix faster, we should just make a new PR. 

Replacing
 // we do not clean beyond the first unstable offset
 log.firstUnstableOffset,

with
 // we do not clean beyond the lastStableOffset (and therefore the high 
watermark)
 Option(log.lastStableOffset),

> LogCleaner may clean past highwatermark
> ---
>
> Key: KAFKA-13194
> URL: https://issues.apache.org/jira/browse/KAFKA-13194
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Bradstreet
>Priority: Minor
>
> Here we have the cleaning point being bounded to the active segment base 
> offset and the first unstable offset. Which makes sense:
>  
> {code:java}
>// find first segment that cannot be cleaned
> // neither the active segment, nor segments with any messages closer to 
> the head of the log than the minimum compaction lag time
> // may be cleaned
> val firstUncleanableDirtyOffset: Long = Seq(  // we do not clean 
> beyond the first unstable offset
>   log.firstUnstableOffset,  // the active segment is always 
> uncleanable
>   Option(log.activeSegment.baseOffset),  // the first segment whose 
> largest message timestamp is within a minimum time lag from now
>   if (minCompactionLagMs > 0) {
> // dirty log segments
> val dirtyNonActiveSegments = 
> log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
> dirtyNonActiveSegments.find { s =>
>   val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
>   debug(s"Checking if log segment may be cleaned: log='${log.name}' 
> segment.baseOffset=${s.baseOffset} " +
> s"segment.largestTimestamp=${s.largestTimestamp}; now - 
> compactionLag=${now - minCompactionLagMs}; " +
> s"is uncleanable=$isUncleanable")
>   isUncleanable
> }.map(_.baseOffset)
>   } else None
> ).flatten.min
> {code}
>  
> But LSO starts out as None.
> {code:java}
> @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
> = None
> private[log] def firstUnstableOffset: Option[Long] = 
> firstUnstableOffsetMetadata.map(_.messageOffset){code}
> For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
> default it to the hwm if it's not set.
>  
> {code:java}
>   private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
> checkIfMemoryMappedBufferClosed()// cache the current high watermark 
> to avoid a concurrent update invalidating the range check
> val highWatermarkMetadata = fetchHighWatermarkMetadata
> firstUnstableOffsetMetadata match {
>   case Some(offsetMetadata) if offsetMetadata.messageOffset < 
> highWatermarkMetadata.messageOffset =>
> if (offsetMetadata.messageOffsetOnly) {
>   lock synchronized {
> val fullOffset = 
> convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
> if (firstUnstableOffsetMetadata.contains(offsetMetadata))
>   firstUnstableOffsetMetadata = Some(fullOffset)
> fullOffset
>   }
> } else {
>   offsetMetadata
> }
>   case _ => highWatermarkMetadata
> }
>   }
> {code}
>  
>  
> This means that in the case where the hwm is prior to the active segment 
> base, the log cleaner may clean past the hwm. This is most likely to occur 
> after a broker restart when the log cleaner may start cleaning prior to 
> replication becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13194) LogCleaner may clean past highwatermark

2021-08-11 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-13194:
-
Description: 
Here we have the cleaning point being bounded to the active segment base offset 
and the first unstable offset. Which makes sense:

 
{code:java}
   // find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the 
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(  // we do not clean beyond 
the first unstable offset
  log.firstUnstableOffset,  // the active segment is always uncleanable
  Option(log.activeSegment.baseOffset),  // the first segment whose 
largest message timestamp is within a minimum time lag from now
  if (minCompactionLagMs > 0) {
// dirty log segments
val dirtyNonActiveSegments = 
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
  val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
  debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
  isUncleanable
}.map(_.baseOffset)
  } else None
).flatten.min


{code}
 

But LSO starts out as None.
{code:java}
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = 
None

private[log] def firstUnstableOffset: Option[Long] = 
firstUnstableOffsetMetadata.map(_.messageOffset){code}
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
default it to the hwm if it's not set.

 
{code:java}

  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed()// cache the current high watermark to 
avoid a concurrent update invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata
firstUnstableOffsetMetadata match {
  case Some(offsetMetadata) if offsetMetadata.messageOffset < 
highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
  lock synchronized {
val fullOffset = 
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
  firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
  }
} else {
  offsetMetadata
}
  case _ => highWatermarkMetadata
}
  }

{code}
 

 

This means that in the case where the hwm is prior to the active segment base, 
the log cleaner may clean past the hwm. This is most likely to occur after a 
broker restart when the log cleaner may start cleaning prior to replication 
becoming active.

  was:
Here we have the cleaning point being bounded to the active segment base offset 
and the first unstable offset. Which makes sense:
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the 
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(

  // we do not clean beyond the first unstable offset
  log.firstUnstableOffset,

  // the active segment is always uncleanable
  Option(log.activeSegment.baseOffset),

  // the first segment whose largest message timestamp is within a minimum 
time lag from now
  if (minCompactionLagMs > 0) \{
// dirty log segments
val dirtyNonActiveSegments = 
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
  val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
  debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
  isUncleanable
}.map(_.baseOffset)
  } else None
).flatten.min
But LSO starts out as None.
  @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
  private[log] def firstUnstableOffset: Option[Long] = 
firstUnstableOffsetMetadata.map(_.messageOffset)
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
default it to the hwm if it's not set.
  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{
checkIfMemoryMappedBufferClosed()

// cache the current high watermark to avoid a concurrent update 
invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata

firstUnstableOffsetMetadata match {
 

[jira] [Updated] (KAFKA-13194) LogCleaner may clean past highwatermark

2021-08-11 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-13194:
-
Priority: Minor  (was: Major)

> LogCleaner may clean past highwatermark
> ---
>
> Key: KAFKA-13194
> URL: https://issues.apache.org/jira/browse/KAFKA-13194
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Bradstreet
>Priority: Minor
>
> Here we have the cleaning point being bounded to the active segment base 
> offset and the first unstable offset. Which makes sense:
>  
> {code:java}
>// find first segment that cannot be cleaned
> // neither the active segment, nor segments with any messages closer to 
> the head of the log than the minimum compaction lag time
> // may be cleaned
> val firstUncleanableDirtyOffset: Long = Seq(  // we do not clean 
> beyond the first unstable offset
>   log.firstUnstableOffset,  // the active segment is always 
> uncleanable
>   Option(log.activeSegment.baseOffset),  // the first segment whose 
> largest message timestamp is within a minimum time lag from now
>   if (minCompactionLagMs > 0) {
> // dirty log segments
> val dirtyNonActiveSegments = 
> log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
> dirtyNonActiveSegments.find { s =>
>   val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
>   debug(s"Checking if log segment may be cleaned: log='${log.name}' 
> segment.baseOffset=${s.baseOffset} " +
> s"segment.largestTimestamp=${s.largestTimestamp}; now - 
> compactionLag=${now - minCompactionLagMs}; " +
> s"is uncleanable=$isUncleanable")
>   isUncleanable
> }.map(_.baseOffset)
>   } else None
> ).flatten.min
> {code}
>  
> But LSO starts out as None.
> {code:java}
> @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
> = None
> private[log] def firstUnstableOffset: Option[Long] = 
> firstUnstableOffsetMetadata.map(_.messageOffset){code}
> For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
> default it to the hwm if it's not set.
>  
> {code:java}
>   private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
> checkIfMemoryMappedBufferClosed()// cache the current high watermark 
> to avoid a concurrent update invalidating the range check
> val highWatermarkMetadata = fetchHighWatermarkMetadata
> firstUnstableOffsetMetadata match {
>   case Some(offsetMetadata) if offsetMetadata.messageOffset < 
> highWatermarkMetadata.messageOffset =>
> if (offsetMetadata.messageOffsetOnly) {
>   lock synchronized {
> val fullOffset = 
> convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
> if (firstUnstableOffsetMetadata.contains(offsetMetadata))
>   firstUnstableOffsetMetadata = Some(fullOffset)
> fullOffset
>   }
> } else {
>   offsetMetadata
> }
>   case _ => highWatermarkMetadata
> }
>   }
> {code}
>  
>  
> This means that in the case where the hwm is prior to the active segment 
> base, the log cleaner may clean past the hwm. This is most likely to occur 
> after a broker restart when the log cleaner may start cleaning prior to 
> replication becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13192) broker.id and node.id can be specified inconsistently

2021-08-11 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino reassigned KAFKA-13192:
-

Assignee: Ron Dagostino

> broker.id and node.id can be specified inconsistently
> -
>
> Key: KAFKA-13192
> URL: https://issues.apache.org/jira/browse/KAFKA-13192
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> If both broker.id and node.id are set, and they are set inconsistently 
> (e.g.broker.id=0, node.id=1) then the value of node.id is used and the 
> broker.id value is left at the original value.  The server should detect this 
> inconsistency, throw a ConfigException, and fail to start.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13194) LogCleaner may clean past highwatermark

2021-08-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-13194:


 Summary: LogCleaner may clean past highwatermark
 Key: KAFKA-13194
 URL: https://issues.apache.org/jira/browse/KAFKA-13194
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Here we have the cleaning point being bounded to the active segment base offset 
and the first unstable offset. Which makes sense:
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the 
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(

  // we do not clean beyond the first unstable offset
  log.firstUnstableOffset,

  // the active segment is always uncleanable
  Option(log.activeSegment.baseOffset),

  // the first segment whose largest message timestamp is within a minimum 
time lag from now
  if (minCompactionLagMs > 0) \{
// dirty log segments
val dirtyNonActiveSegments = 
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
  val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
  debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
  isUncleanable
}.map(_.baseOffset)
  } else None
).flatten.min
But LSO starts out as None.
  @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
  private[log] def firstUnstableOffset: Option[Long] = 
firstUnstableOffsetMetadata.map(_.messageOffset)
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
default it to the hwm if it's not set.
  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{
checkIfMemoryMappedBufferClosed()

// cache the current high watermark to avoid a concurrent update 
invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata

firstUnstableOffsetMetadata match {
  case Some(offsetMetadata) if offsetMetadata.messageOffset < 
highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
  lock synchronized {
val fullOffset = 
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
  firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
  }
} else \{
  offsetMetadata
}
  case _ => highWatermarkMetadata
}
  }
This means that in the case where the hwm is prior to the active segment base, 
the log cleaner may clean past the hwm. This is most likely to occur after a 
broker restart when the log cleaner may start cleaning prior to replication 
becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13193) Replica manager doesn't update partition state when transitioning from leader to follower with unknown leader

2021-08-11 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13193:
--

 Summary: Replica manager doesn't update partition state when 
transitioning from leader to follower with unknown leader
 Key: KAFKA-13193
 URL: https://issues.apache.org/jira/browse/KAFKA-13193
 Project: Kafka
  Issue Type: Bug
  Components: kraft, replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


This issue applies to both the ZK and KRaft implementation of the replica 
manager. In the rare case when a replica transition from leader to follower 
with no leader the partition state is not updated.

This is because when handling makeFollowers the ReplicaManager only updates the 
partition state if the leader is alive. The solution is to always transition to 
follower but not start the fetcher thread if the leader is unknown or not alive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12954) Add Support for Scala 3 in 4.0.0

2021-08-11 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397503#comment-17397503
 ] 

Josep Prat commented on KAFKA-12954:


There is a PR for supporting Scala 3 in Gradle: 
https://github.com/gradle/gradle/pull/18001

> Add Support for Scala 3 in 4.0.0
> 
>
> Key: KAFKA-12954
> URL: https://issues.apache.org/jira/browse/KAFKA-12954
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Josep Prat
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> This is a follow up task from 
> https://issues.apache.org/jira/browse/KAFKA-12895, in which Scala 2.12 
> support will be dropped.
> It would be good to, at the same time, add support for Scala 3.
> Initially it would be enough to only make the code compile with Scala 3 so we 
> can generate the proper Scala 3 artifacts, this might be achieved with the 
> proper compiler flags and an occasional rewrite.
> Follow up tasks could be created to migrate to a more idiomatic Scala 3 
> writing if desired.
> If I understand it correctly, this would need a KIP as we are modifying the 
> public interfaces (new artifacts). If this is the case, let me know  and I 
> will write it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13192) broker.id and node.id can be specified inconsistently

2021-08-11 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13192:
-

 Summary: broker.id and node.id can be specified inconsistently
 Key: KAFKA-13192
 URL: https://issues.apache.org/jira/browse/KAFKA-13192
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
Reporter: Ron Dagostino


If both broker.id and node.id are set, and they are set inconsistently 
(e.g.broker.id=0, node.id=1) then the value of node.id is used and the 
broker.id value is left at the original value.  The server should detect this 
inconsistency, throw a ConfigException, and fail to start.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rodesai commented on pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-11 Thread GitBox


rodesai commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-896970248


   @cadonna @ableegoldman @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods kafka-0 
(repeatedly spamming the log) :
{noformat}
[2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), 
retrying. (kafka.server.DefaultAlterIsrManager){noformat}
Kafka-1 seems ok

When kafka-2 starts, it has this log entry with regards to its own broker epoch:
{noformat}
[2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with 
addresses: 
INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094,
 czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat}
This is despite kafka-2 appearing to start fine, this is what you see in 
kafka-2's logs, nothing else seems to be added to the log, it just seems to 
hang here:
{noformat}
[2021-08-11 11:44:48,911] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] 
Started socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-11 11:44:48,913] INFO Kafka version: 6.2.0-ccs 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-08-11 11:44:48,913] INFO Kafka commitId: 1a5755cf9401c84f 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-08-11 11:44:48,913] INFO Kafka startTimeMs: 1628682288911 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-08-11 11:44:48,914] INFO [KafkaServer id=2] started 
(kafka.server.KafkaServer) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

Just to be clear, rolling restarting only kafka or zookeeper is absolutely 
fine. 


After some additional testing, it appears this can be recreated simply by 
restarting a broker pod and then restarting the zookeeper leader as the broker 
is shutting down. 

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset fir

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods kafka-0 
(repeatedly spamming the log) :
{noformat}
[2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), 
retrying. (kafka.server.DefaultAlterIsrManager){noformat}
Kafka-1 seems ok

When kafka-2 starts, it has this log entry with regards to its own broker epoch:
{noformat}
[2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with 
addresses: 
INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094,
 czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat}
This is despite kafka-2 appearing to start fine, this is what you see in 
kafka-2's logs, nothing else seems to be added to the log, it just seems to 
hang here:
{noformat}
[2021-08-11 11:44:48,911] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] 
Started socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-11 11:44:48,913] INFO Kafka version: 6.2.0-ccs 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-08-11 11:44:48,913] INFO Kafka commitId: 1a5755cf9401c84f 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-08-11 11:44:48,913] INFO Kafka startTimeMs: 1628682288911 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-08-11 11:44:48,914] INFO [KafkaServer id=2] started 
(kafka.server.KafkaServer) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

Just to be clear, rolling restarting only kafka or zookeeper is absolutely 
fine. 

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if an

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods kafka-0 
(repeatedly spamming the log) :
{noformat}
[2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), 
retrying. (kafka.server.DefaultAlterIsrManager){noformat}
Kafka-1 seems ok

When kafka-2 starts, it has this log entry with regards to its own broker epoch:
{noformat}
[2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with 
addresses: 
INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094,
 czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

Just to be clear, rolling restarting only kafka or zookeeper is absolutely 
fine. 

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods kafka-0:
{noformat}
[2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods kafka-0:
{noformat}
[2021-08-11 11:45:57,625] WARN Broker had a stale broker epoch (670014914375), 
retrying. (kafka.server.DefaultAlterIsrManager){noformat}
Kafka-1 seems ok

When kafka-2 starts, it has this log entry with regards to its own broker epoch:
{noformat}
[2021-08-11 11:44:48,116] INFO Registered broker 2 at path /brokers/ids/2 with 
addresses: 
INTERNAL://kafka-2.kafka.svc.cluster.local:9092,INTERNAL_SECURE://kafka-2.kafka.svc.cluster.local:9094,
 czxid (broker epoch): 674309865493 (kafka.zk.KafkaZkClient) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

Just to be clear, rolling restarting only kafka or zookeeper is absolutely 
fine. 

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlter

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

Just to be clear, rolling restarting only kafka or zookeeper is absolutely 
fine. 

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brok

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

Just to be clear, rolling restarting only kafka

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /bro

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

I appreciate that Zookeeper and Kafka would not normally be restarted 
concurrently in this way. However there are going to be scenarios where this 
can happen, such as if we had simultaneous Kubernetes node failures, resulting 
in the loss of both a zookeeper and a kafka pod at the same time. This could 
result in the issue above. 

This is not something that we have seen previously with versions 1.1 or 2.5. 

 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, e

[GitHub] [kafka] chia7712 commented on a change in pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…

2021-08-11 Thread GitBox


chia7712 commented on a change in pull request #10881:
URL: https://github.com/apache/kafka/pull/10881#discussion_r686709179



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##
@@ -42,77 +39,65 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doThrow;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
 public class StateManagerUtilTest {
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorStateManager stateManager;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorTopology topology;
 
-@Mock(type = MockType.NICE)
+@Mock
 private InternalProcessorContext processorContext;
 
-private IMocksControl ctrl;
 
 private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
 private final TaskId taskId = new TaskId(0, 0);
 
 @Before
 public void setup() {
-ctrl = createStrictControl();
-topology = ctrl.createMock(ProcessorTopology.class);
-processorContext = ctrl.createMock(InternalProcessorContext.class);
+topology = mock(ProcessorTopology.class);
+processorContext = mock(InternalProcessorContext.class);
 
-stateManager = ctrl.createMock(ProcessorStateManager.class);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+stateManager = mock(ProcessorStateManager.class);
+stateDirectory = mock(StateDirectory.class);
 }
 
 @Test
 public void testRegisterStateStoreWhenTopologyEmpty() {
-expect(topology.stateStores()).andReturn(emptyList());
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(topology.stateStores()).thenReturn(emptyList());
+inOrder(topology);
 
 StateManagerUtil.registerStateStores(logger,
 "logPrefix:", topology, stateManager, stateDirectory, 
processorContext);
-
-ctrl.verify();
 }
 
 @Test
 public void testRegisterStateStoreFailToLockStateDirectory() {
-expect(topology.stateStores()).andReturn(singletonList(new 
MockKeyValueStore("store", false)));
+when(topology.stateStores()).thenReturn(singletonList(new 
MockKeyValueStore("store", false)));
+
+when(stateManager.taskId()).thenReturn(taskId);
 
-expect(stateManager.taskId()).andReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(false);
 
-expect(stateDirectory.lock(taskId)).andReturn(false);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+inOrder(topology, stateManager, stateDirectory);

Review comment:
   you have to check the execution order through the return object from 
`inOrder`

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##
@@ -42,77 +39,65 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doThrow;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
 public class StateManagerUtilTest {
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorStateManager stateManager;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorTopology topology;
 
-@Mock(type = MockType.NICE)
+@Mock
 private InternalProcessorContext processorContext;
 
-private IMocksControl ctrl;
 
 private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
 private final TaskId taskId = new TaskId(0, 0);
 
 @Before
 public void setup() {
-ctrl = createStrictControl();
-topology = ctrl.createMock(ProcessorTopology.class);
-processor

[jira] [Updated] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CS User updated KAFKA-13191:

Description: 
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgrading, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

[https://github.com/apache/kafka/pull/9100/files#r494480847]

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 

  was:
We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgraded, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

https://github.com/apache/kafka/pull/9100/files#r494480847

 

Recovery is possible by continuing to restart the remaining brokers in the 
clu

[jira] [Created] (KAFKA-13191) Kafka 2.8 - simultaneous restarts of Kafka and zookeeper result in broken cluster

2021-08-11 Thread CS User (Jira)
CS User created KAFKA-13191:
---

 Summary: Kafka 2.8 - simultaneous restarts of Kafka and zookeeper 
result in broken cluster
 Key: KAFKA-13191
 URL: https://issues.apache.org/jira/browse/KAFKA-13191
 Project: Kafka
  Issue Type: Bug
  Components: protocol
Affects Versions: 2.8.0
Reporter: CS User


We're using confluent platform 6.2, running in a Kubernetes environment. The 
cluster has been running for a couple of years with zero issues, starting from 
version 1.1, 2.5 and now 2.8. 

We've very recently upgraded to kafka 2.8 from kafka 2.5. 

Since upgraded, we have seen issues when kafka and zookeeper pods restart 
concurrently. 

We can replicate the issue by restarting either the zookeeper statefulset first 
or the kafka statefulset first, either way appears to result with the same 
failure scenario. 

We've attempted to mitigate by preventing the kafka pods from stopping if any 
zookeeper pods are being restarted, or a rolling restart of the zookeeper 
cluster is underway. 

We've also added a check to stop the kafka pods from starting until all 
zookeeper pods are ready, however under the following scenario we still see the 
issue:

In a 3 node kafka cluster with 5 zookeeper servers
 # kafka-2 starts to terminate - all zookeeper pods are running, so it proceeds
 # zookeeper-4 terminates
 # kafka-2 starts-up, and waits until the zookeeper rollout completes
 # kafka-2 eventually fully starts, kafka comes up and we see the errors below 
on other pods in the cluster. 

Without mitigation and in the above scenario we see errors on pods 
kafka-0/kafka-1:
{noformat}
Broker had a stale broker epoch (635655171205), retrying. 
(kafka.server.DefaultAlterIsrManager) {noformat}
This never appears to recover. 

If you then restart kafka-2, you'll see these errors:
{noformat}
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 0. {noformat}
This seems to completely break the cluster, partitions do not failover as 
expected. 

 

Checking zookeeper, and getting the values of the brokers look fine 
{noformat}
 get /brokers/ids/0{noformat}
etc, all looks fine there, each broker is present. 

 

This error message appears to have been added to kafka in the last 11 months 
{noformat}
Broker had a stale broker epoch {noformat}
Via this PR:

[https://github.com/apache/kafka/pull/9100]

I see also this comment around the leader getting stuck:

https://github.com/apache/kafka/pull/9100/files#r494480847

 

Recovery is possible by continuing to restart the remaining brokers in the 
cluster. Once all have been restarted, everything looks fine.

Has anyone else come across this? It seems very simple to replicate in our 
environment, simply start a simultaneous rolling restart of both kafka and 
zookeeper. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9914) Mirror Maker 2 creates heartbeats kafka topics recursively

2021-08-11 Thread Al Ricafort (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397208#comment-17397208
 ] 

Al Ricafort commented on KAFKA-9914:


Hi,

I have the same issue.

I have 2 Kafka setup, A and B. A is a cluster with 3 instances running on the 
same machine, while B is a stand alone in another machine. I tried to use 
Mirror Maker to replicate from A to B. The config file is as follows:

 

{{clusters = A, B}}

{{A.bootstrap.servers = host1:9091, host1:9092, host1:9093}}

{{B.bootstrap.servers = host2:9092}}

{{A->B.enabled = true}}

{{B->A.enabled = false}}

{{replication.factor = 1}}

{{checkpoints.topic.replication.factor=1}}

{{heartbeats.topic replication.factor=1}}

{{offset-syncs.topic.replication.factor=1}}

{{offset.storage.replication.factor=1}}

{{status.storage.replication.factor=1}}

{{config.storage.replication.factor=1}}

I execute mirror maker as follows:

 

{{bin/connect-mirror-maker.sh config/mirror.maker.properties}}

But when I run the mirror maker on the ‘B’ side I can see that it keeps 
creating ‘heartbeats’ topics. So at some point I will end up having so many 
‘A.heartbeats’ topic like below. And it will just keep growing until I kill the 
mirror maker.

 

{{A.A.A.A.heartbeats}}

{{A.A.A.heartbeats}}

{{A.A.heartbeats}}

{{A.heartbeats}}

 

How to go around this issue?

Thanks.

Kafka Version: 2.13-2.8

> Mirror Maker 2 creates heartbeats kafka topics recursively
> --
>
> Key: KAFKA-9914
> URL: https://issues.apache.org/jira/browse/KAFKA-9914
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.0.0
>Reporter: azher khan
>Priority: Major
>
> Hi Team,
> I configured Mirror Maker 2 to run on a Pod configuring the Kafka brokers in 
> our environments marking '10.X.X.YY' Kafka broker as primary and '10.X.Y.XY' 
> as backup.
> +*Mirror Maker mm2.properties*+ 
> {code:java}
> clusters=kafka,backup
> kafka.bootstrap.servers=10.X.X.YY:9092
> backup.bootstrap.servers=10.X.Y.XY:9092
> kafka->backup.enabled=true
> kafka->backup.topics=az.*
> backup->kafka.enabled=false
> sync.topic.acls.enabled=true
> {code}
>  
> I was able to run Mirror Maker 2 successfully and was also able to take 
> backup of the identified topics (starting with 'az').
>  
> However I could see many kafka topics with the suffix 'heartbeats' created in 
> recursion (See below the list of topics with 'kafka.*.heartbeats'). This 
> could be because i triggered the 'connect-mirror-maker.sh' several times to 
> test backing up other topics
>  
> I have 2 queries
>  # How to avoid having 'kafka.*.heartbeats' topic being created using Mirror 
> Maker 2
>  # Once Mirror Maker 2 has backed up a topic say 'azherf1test' . What is the 
> best way to rollback changes made by Mirror Maker (ie delete all the topics 
> created by Mirror Maker 'kafka.azherf1test' and supporting topics) ensuring 
> the stability of existing/source topics and Kafka broker.
> We are testing Mirror Maker and want to ensure we are able to roll back the 
> changes without affecting the Kafka topics/brokers)
> +*Kafka Topics list output:*+
> {code:java}
> azherf1test
> heartbeats
> kafka-client-topic
> mm2-configs.backup.internal
> mm2-configs.kafka.internal
> mm2-offset-syncs.backup.internal
> mm2-offsets.backup.internal
> mm2-offsets.kafka.internal
> mm2-status.backup.internal
> mm2-status.kafka.internal
> kafka.azherf1test
> kafka.checkpoints.internal
> kafka.heartbeats
> kafka.kafka.heartbeats
> kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.ka

[GitHub] [kafka] dielhennr removed a comment on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-08-11 Thread GitBox


dielhennr removed a comment on pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#issuecomment-892837165


   @hachikuji @rajinisivaram since this build is green I’m fairly confident 
that dynamic config behavior hasn’t changed in a zk cluster. I  am wondering if 
it is better to have buggy validation than no validation for KRaft brokers 
(assuming this is still buggy). Do either of you anticipate any problems with 
doing this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org