[jira] [Updated] (KAFKA-12273) InterBrokerSendThread#pollOnce throws FatalExitError even though it is shutdown correctly

2021-02-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-12273:
---
Fix Version/s: 2.8.0

> InterBrokerSendThread#pollOnce throws FatalExitError even though it is 
> shutdown correctly
> -
>
> Key: KAFKA-12273
> URL: https://issues.apache.org/jira/browse/KAFKA-12273
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.8.0
>
>
> kafka tests sometimes shutdown gradle with non-zero code. The (one of) root 
> cause is that InterBrokerSendThread#pollOnce encounters DisconnectException 
> when NetworkClient is closing. DisconnectException should be viewed as 
> "expected" error as we do close it. In other words, 
> InterBrokerSendThread#pollOnce should swallow it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10124: MINOR: apply Utils.isBlank to code base

2021-02-16 Thread GitBox


chia7712 commented on pull request #10124:
URL: https://github.com/apache/kafka/pull/10124#issuecomment-780347504


   @tang7526 Thanks for your patch. Could you fix following code also?
   
   1. 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L448
   1. 
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerScopeUtilsTest.java#L31
   1. 
https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java#L914
   1. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L642
   1. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L473
   1. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Log4jController.scala#L74
   1. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Log4jController.scala#L83
 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-02-16 Thread GitBox


cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577363631



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
 assertThat(store.get(0), nullValue());
 }
+
+
+@Test
+public void shouldReturnKeysWithGivenPrefix(){
+store = createKeyValueStore(driver.context());
+final String value = "value";
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(1, value));
+entries.add(new KeyValue<>(2, value));
+entries.add(new KeyValue<>(11, value));
+entries.add(new KeyValue<>(13, value));
+
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan(1, new IntegerSerializer());

Review comment:
   That sounds good!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-02-16 Thread GitBox


vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577363103



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
 assertThat(store.get(0), nullValue());
 }
+
+
+@Test
+public void shouldReturnKeysWithGivenPrefix(){
+store = createKeyValueStore(driver.context());
+final String value = "value";
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(1, value));
+entries.add(new KeyValue<>(2, value));
+entries.add(new KeyValue<>(11, value));
+entries.add(new KeyValue<>(13, value));
+
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan(1, new IntegerSerializer());

Review comment:
   > The reason, we get only `1` when we scan for prefix `1` is that the 
integer serializer serializes `11` and `13` in the least significant byte 
instead of serializing `1` in the byte before the least significant byte and 
`1` and `3` in the least significant byte. With the former the **byte** 
lexicographical order of `1 2 11 13` would be `1 2 11 13` which corresponds to 
the natural order of integers. With the latter the **byte** lexicographical 
order of `1 2 11 13` would be `1 11 13 2` which corresponds to the string 
lexicographical order. So the serializer determines the order of the entries 
and the store always returns the entries in byte lexicographical order.
   > 
   > You will experience a similar when you call `range(-1, 2)` on the 
in-memory state store in the unit test. You will get back an empty result since 
`-1` is larger then `2` in byte lexicographical order
   > when the `IntegerSerializer` is used. Also not the warning that is output, 
especially this part `... or serdes that don't preserve ordering when 
lexicographically comparing the serialized bytes ...`
   > 
   > I think we should clearly state this limitation in the javadocs of the 
`prefixScan()` as we have done for `range()`, maybe with an example.
   > 
   > Currently, to get `prefixScan()` working for all types, we would need to 
do a complete scan (i.e. `all()`) followed by a filter, right?
   
   That is correct. That is the only way currently. 
   
   > 
   > Double checking: Is my understanding correct? @ableegoldman
   
   I think adding a warning similar to the range() query would be good. I will 
do that as part of the PR. However, in this test class, adding test cases for 
the integer serializer won't make sense. Probably I will create another KVStore 
and add tests for those. Is that ok, @cadonna ?
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-02-16 Thread GitBox


vamossagar12 commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577360887



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
 assertThat(store.get(0), nullValue());
 }
+
+
+@Test
+public void shouldReturnKeysWithGivenPrefix(){
+store = createKeyValueStore(driver.context());
+final String value = "value";
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(1, value));
+entries.add(new KeyValue<>(2, value));
+entries.add(new KeyValue<>(11, value));
+entries.add(new KeyValue<>(13, value));
+
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan(1, new IntegerSerializer());

Review comment:
   Thanks @cadonna , @ableegoldman  for the detailed explanation. I 
understood the behaviour now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10123: KAFKA-12327: Remove MethodHandle usage in CompressionType

2021-02-16 Thread GitBox


ijuma commented on pull request #10123:
URL: https://github.com/apache/kafka/pull/10123#issuecomment-780326554


   I'm not sure it's worth it since the cost of compressing on the broker 
during fetches is significantly higher than compressing during produce (the 
data is already on the heap for the latter and there are usually multiple 
fetches per produce). That is not to say that it's never useful, just that the 
ROI seems a bit low.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph opened a new pull request #10139: MINOR: Print the warning log before truncation.

2021-02-16 Thread GitBox


kamalcph opened a new pull request #10139:
URL: https://github.com/apache/kafka/pull/10139


   - After truncation, hw can be moved to the truncation offset.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #10138: KAFKA-12331: Use LEO for the base offset of LeaderChangeMessage batch

2021-02-16 Thread GitBox


jsancio opened a new pull request #10138:
URL: https://github.com/apache/kafka/pull/10138


   WIP
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-02-16 Thread GitBox


chia7712 commented on pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#issuecomment-780309024


   @g1geordie Thanks for your updating. I will merge it tomorrow :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-16 Thread GitBox


vvcephei commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r577333186



##
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##
@@ -243,6 +244,11 @@
  */
 Map endOffsets(Collection 
partitions, Duration timeout);
 
+/**
+ * @see KafkaConsumer#currentLag(TopicPartition)
+ */
+OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
   Woah, you are _fast_, @chia7712 !
   
   I just sent a message to the vote thread. I wanted to submit this PR first 
so that the vote thread message can have the full context available.
   
   Do you mind reading over what I said there? If it sounds good to you, then 
I'll update the KIP, and we can maybe put this whole mess to bed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-16 Thread GitBox


chia7712 commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r577332179



##
File path: clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
##
@@ -243,6 +244,11 @@
  */
 Map endOffsets(Collection 
partitions, Duration timeout);
 
+/**
+ * @see KafkaConsumer#currentLag(TopicPartition)
+ */
+OptionalLong currentLag(TopicPartition topicPartition);

Review comment:
   Pardon me, KIP-695 does not include this change. It seems KIP-695 is 
still based on `metadata`? Please correct me If I misunderstand anything :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10123: KAFKA-12327: Remove MethodHandle usage in CompressionType

2021-02-16 Thread GitBox


chia7712 commented on pull request #10123:
URL: https://github.com/apache/kafka/pull/10123#issuecomment-780303685


   > Today, the solution would be to either:
   Include the relevant native compression libraries
   Limit topic data to the compression algorithms supported on the relevant 
platform
   Both seem doable.
   
   @ijuma Thanks for the sharing. IIRC, kafka producer which does not support 
the compression can send uncompressed data to server. The data get compressed 
on server-side. It is a useful feature since the compression does not obstruct 
us from producing data on env which can't load native compression libraries. In 
contrast with producer, kafka consumer can't get data from compressed topic if 
it can't load native compression libraries. WDYT? Does it pay to support such 
scenario?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-16 Thread GitBox


vvcephei opened a new pull request #10137:
URL: https://github.com/apache/kafka/pull/10137


   Implements KIP-695
   
   Reverts a previous behavior change to Consumer.poll and replaces
   it with a new Consumer.currentLag API, which returns the client's
   currently cached lag.
   
   Uses this new API to implement the desired task idling semantics
   improvement from KIP-695.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

2021-02-16 Thread GitBox


abbccdda commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r577322703



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -134,6 +134,8 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
+  var clientToControllerChannelManager: 
Option[BrokerToControllerChannelManager] = None
+
   var alterIsrManager: AlterIsrManager = null

Review comment:
   We are planning to consolidate into two channels eventually:
   1. broker to controller channel
   2. client to controller channel
   
   here, auto topic creation and forwarding fall into the 2nd category, while 
AlterIsr would be the 1st category.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10130: MINOR: Fix typo in MirrorMaker

2021-02-16 Thread GitBox


chia7712 commented on pull request #10130:
URL: https://github.com/apache/kafka/pull/10130#issuecomment-780286687


   @runom Thanks for your patch :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10130: MINOR: Fix typo in MirrorMaker

2021-02-16 Thread GitBox


chia7712 merged pull request #10130:
URL: https://github.com/apache/kafka/pull/10130


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10130: MINOR: Fix typo in MirrorMaker

2021-02-16 Thread GitBox


chia7712 commented on pull request #10130:
URL: https://github.com/apache/kafka/pull/10130#issuecomment-780286136


   the error is traced by #10024



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10082: MINOR: use 'mapKey' to avoid unnecessary grouped data

2021-02-16 Thread GitBox


chia7712 merged pull request #10082:
URL: https://github.com/apache/kafka/pull/10082


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

2021-02-16 Thread GitBox


dengziming commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r577312082



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -134,6 +134,8 @@ class KafkaServer(
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
+  var clientToControllerChannelManager: 
Option[BrokerToControllerChannelManager] = None
+
   var alterIsrManager: AlterIsrManager = null

Review comment:
   Hello, I have a question, should we also share the channel between 
alterIsrManager and autoCreationManager, furthermore, also share the same one 
with alterReplicaStateManager proposed in KIP-589.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10082: MINOR: use 'mapKey' to avoid unnecessary grouped data

2021-02-16 Thread GitBox


chia7712 commented on pull request #10082:
URL: https://github.com/apache/kafka/pull/10082#issuecomment-780282255


   the error is traced by #10024



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10082: MINOR: use 'mapKey' to avoid unnecessary grouped data

2021-02-16 Thread GitBox


chia7712 commented on pull request #10082:
URL: https://github.com/apache/kafka/pull/10082#issuecomment-780282166


   > Looks like ConsumerProtocolAssignment was also changed to have mapKey, so 
worth updating the PR to include that information.
   
   done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10136: MONOR: Import classes that is used is docs to fix warnings.

2021-02-16 Thread GitBox


dengziming opened a new pull request #10136:
URL: https://github.com/apache/kafka/pull/10136


   *More detailed description of your change*
   Fix this:
   
![image](https://user-images.githubusercontent.com/26023240/108154064-43c36f00-7117-11eb-80dc-e62db0bac081.png)
   
   
   *Summary of testing strategy (including rationale)*
   None 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12331) KafkaRaftClient should use the LEO when appending LeaderChangeMessage

2021-02-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-12331:
---

Assignee: Jose Armando Garcia Sancio

> KafkaRaftClient should use the LEO when appending LeaderChangeMessage
> -
>
> Key: KAFKA-12331
> URL: https://issues.apache.org/jira/browse/KAFKA-12331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. 
> This is enforced when KafkaRaftClient uses the BatchAccumulator to create 
> batches. When creating the control batch for the LeaderChangeMessage the 
> KafkaRaftClient doesn't use the BatchAccumulator and instead creates the 
> batch with the default base offset of 0.
> This causes the validation in KafkaMetadataLog to fail with the following 
> exception:
> {code:java}
> kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to 
> @metadata-0. First offset 0 is less than the next offset 5. First 10 offsets 
> in append: ArrayBuffer(0), last offset in append: 0. Log start offset = 0
>   at kafka.log.Log.append(Log.scala:1217)
>   at kafka.log.Log.appendAsLeader(Log.scala:1092)
>   at kafka.raft.KafkaMetadataLog.appendAsLeader(KafkaMetadataLog.scala:92)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1158)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:449)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:409)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:463)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleVoteResponse(KafkaRaftClient.java:663)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1530)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1652)
>   at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2183)
>   at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
> {code}
> We should make the following changes:
>  # Fix MockLog to perform similar validation as 
> KafkaMetadataLog::appendAsLeader
>  # Use the LEO when creating the control batch for the LeaderChangedMessage



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12331) KafkaRaftClient should use the LEO when appending LeaderChangeMessage

2021-02-16 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-12331:
---
Description: 
KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. 
This is enforced when KafkaRaftClient uses the BatchAccumulator to create 
batches. When creating the control batch for the LeaderChangeMessage the 
KafkaRaftClient doesn't use the BatchAccumulator and instead creates the batch 
with the default base offset of 0.

This causes the validation in KafkaMetadataLog to fail with the following 
exception:
{code:java}
kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to 
@metadata-0. First offset 0 is less than the next offset 5. First 10 offsets in 
append: ArrayBuffer(0), last offset in append: 0. Log start offset = 0
at kafka.log.Log.append(Log.scala:1217)
at kafka.log.Log.appendAsLeader(Log.scala:1092)
at kafka.raft.KafkaMetadataLog.appendAsLeader(KafkaMetadataLog.scala:92)
at 
org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1158)
at 
org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:449)
at 
org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:409)
at 
org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:463)
at 
org.apache.kafka.raft.KafkaRaftClient.handleVoteResponse(KafkaRaftClient.java:663)
at 
org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1530)
at 
org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1652)
at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2183)
at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
{code}
We should make the following changes:
 # Fix MockLog to perform similar validation as KafkaMetadataLog::appendAsLeader
 # Use the LEO when creating the control batch for the LeaderChangedMessage

  was:
KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. 
This is enforced when KafkaRaftClient uses the BatchAccumulator to create 
batches. When creating the control batch for the LeaderChangeMessage the 
KafkaRaftClient doesn't use the BatchAccumulator and instead creates the batch 
with the default base offset of 0.

This causes the validation in KafkaMetadataLog to fail with the following 
exception:
{code:java}
kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to 
@metadata-0. First offset 0 is less than the next offset 5. First 10 offsets in 
append: ArrayBuffer(0), last offset in append: 0. Log start offset = 0
at kafka.log.Log.append(Log.scala:1217)
at kafka.log.Log.appendAsLeader(Log.scala:1092)
at kafka.raft.KafkaMetadataLog.appendAsLeader(KafkaMetadataLog.scala:92)
at 
org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1158)
at 
org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:449)
at 
org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:409)
at 
org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:463)
at 
org.apache.kafka.raft.KafkaRaftClient.handleVoteResponse(KafkaRaftClient.java:663)
at 
org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1530)
at 
org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1652)
at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2183)
at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
{code}


> KafkaRaftClient should use the LEO when appending LeaderChangeMessage
> -
>
> Key: KAFKA-12331
> URL: https://issues.apache.org/jira/browse/KAFKA-12331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. 
> This is enforced when KafkaRaftClient uses the BatchAccumulator to create 
> batches. When creating the control batch for the LeaderChangeMessage the 
> KafkaRaftClient doesn't use the BatchAccumulator and instead creates the 
> batch with the default base offset of 0.
> This causes the validation in KafkaMetadataLog to fail with the following 
> exception:
> {code:java}
> kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to 
> @metadata-0. First offset 0 is less than 

[jira] [Created] (KAFKA-12331) KafkaRaftClient should use the LEO when appending LeaderChangeMessage

2021-02-16 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12331:
--

 Summary: KafkaRaftClient should use the LEO when appending 
LeaderChangeMessage
 Key: KAFKA-12331
 URL: https://issues.apache.org/jira/browse/KAFKA-12331
 Project: Kafka
  Issue Type: Sub-task
  Components: replication
Reporter: Jose Armando Garcia Sancio


KafkaMetadataLog's appendAsLeader expects the base offset to match the LEO. 
This is enforced when KafkaRaftClient uses the BatchAccumulator to create 
batches. When creating the control batch for the LeaderChangeMessage the 
KafkaRaftClient doesn't use the BatchAccumulator and instead creates the batch 
with the default base offset of 0.

This causes the validation in KafkaMetadataLog to fail with the following 
exception:
{code:java}
kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to 
@metadata-0. First offset 0 is less than the next offset 5. First 10 offsets in 
append: ArrayBuffer(0), last offset in append: 0. Log start offset = 0
at kafka.log.Log.append(Log.scala:1217)
at kafka.log.Log.appendAsLeader(Log.scala:1092)
at kafka.raft.KafkaMetadataLog.appendAsLeader(KafkaMetadataLog.scala:92)
at 
org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1158)
at 
org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:449)
at 
org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:409)
at 
org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:463)
at 
org.apache.kafka.raft.KafkaRaftClient.handleVoteResponse(KafkaRaftClient.java:663)
at 
org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1530)
at 
org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1652)
at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2183)
at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda opened a new pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

2021-02-16 Thread GitBox


abbccdda opened a new pull request #10135:
URL: https://github.com/apache/kafka/pull/10135


   We want to consolidate forwarding and auto creation channel into one channel 
to reduce the unnecessary connections maintained between brokers and controller.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577278241



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577268593



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {
+class ReadyBrokersFuture {
+private final CompletableFuture future;
+private final int minBrokers;
+
+ReadyBrokersFuture(CompletableFuture future, int minBrokers) {
+this.future = future;
+this.minBrokers = minBrokers;
+}
+
+boolean check() {
+int numUnfenced = 0;
+for (BrokerRegistration registration : 
brokerRegistrations.values()) {
+if (!registration.fenced()) {
+numUnfenced++;
+}
+if (numUnfenced >= minBrokers) {
+return true;
+}
+}
+return false;
+}
+}
+
+/**
+ * The SLF4J log context.
+ */
+private final LogContext logContext;
+
+/**
+ * The SLF4J log object.
+ */
+private final Logger log;
+
+/**
+ * The Kafka clock object to use.
+ */
+private final Time time;
+
+/**
+ * How long sessions should last, in nanoseconds.
+ */
+private final long sessionTimeoutNs;
+
+/**
+ * The replica placement policy to use.
+ */
+private final ReplicaPlacementPolicy placementPolicy;
+
+/**
+ * Maps broker IDs to broker registrations.
+ */
+private final TimelineHashMap 
brokerRegistrations;
+
+/**
+ * The broker heartbeat manager, or null if this controller is on standby.
+ */
+private BrokerHeartbeatManager heartbeatManager;
+
+/**
+ * A future which is completed as soon as we have the given number of 
brokers
+ * ready.
+ */
+private Optional readyBrokersFuture;
+
+ClusterControlManager(LogContext logContext,
+  Time time,
+  SnapshotRegistry snapshotRegistry,
+  long sessionTimeoutNs,
+  ReplicaPlacementPolicy placementPolicy) {
+this.logContext = logContext;
+this.log = logContext.logger(ClusterControlManager.class);
+this.time = time;
+this.sessionTimeoutNs = sessionTimeoutNs;
+this.placementPolicy = placementPolicy;
+this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+this.heartbeatManager = null;
+this.readyBrokersFuture = Optional.empty();
+}
+
+/**
+ * Transition 

[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577263866



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577262652



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] junrao commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577259797



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] mjsax commented on pull request #9670: MINOR: Clarify config names for EOS versions 1 and 2

2021-02-16 Thread GitBox


mjsax commented on pull request #9670:
URL: https://github.com/apache/kafka/pull/9670#issuecomment-780225430


   Thanks @JimGalasyn! Merged to `trunk` and cherry-picked to `2.8`, `2.7`, and 
`2.6` branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9670: MINOR: Clarify config names for EOS versions 1 and 2

2021-02-16 Thread GitBox


mjsax merged pull request #9670:
URL: https://github.com/apache/kafka/pull/9670


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12328) Find out partition of a store iterator

2021-02-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285569#comment-17285569
 ] 

Matthias J. Sax commented on KAFKA-12328:
-

You should be able to use `ProcessorContext.taskId()` – task-ids are encoded as 
"_" so with some string parsing you should 
be able to get the partition number.

Because no record is processed in a punctuation, there is not really a 
partition number... (`ProcessorContext.partition()` returns the partition of 
the currently processed record). – I guess, strictly speaking (given the 
current design), the partition number is fixed (tied to the task) so we could 
actually always return it independently if we have a record at hand or not.

> Find out partition of a store iterator
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12328) Find out partition of a store iterator

2021-02-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12328:

Component/s: streams

> Find out partition of a store iterator
> --
>
> Key: KAFKA-12328
> URL: https://issues.apache.org/jira/browse/KAFKA-12328
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: fml2
>Priority: Major
>
> This question was posted [on 
> stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
>  and got an answer but the solution is quite complicated hence this ticket.
>  
> In my Kafka Streams application, I have a task that sets up a scheduled (by 
> the wall time) punctuator. The punctuator iterates over the entries of a 
> store and does something with them. Like this:
> {code:java}
> var store = context().getStateStore("MyStore");
> var iter = store.all();
> while (iter.hasNext()) {
>var entry = iter.next();
>// ... do something with the entry
> }
> // Print a summary (now): N entries processed
> // Print a summary (wish): N entries processed in partition P
> {code}
> Is it possible to find out which partition the punctuator operates on? The 
> java docs for {{ProcessorContext.partition()}} states that this method 
> returns {{-1}} within punctuators.
> I've read [Kafka Streams: Punctuate vs 
> Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
>  and the answers there. I can understand that a task is, in general, not tied 
> to a particular partition. But an iterator should be tied IMO.
> How can I find out the partition?
> Or is my assumption that a particular instance of a store iterator is tied to 
> a partion wrong?
> What I need it for: I'd like to include the partition number in some log 
> messages. For now, I have several nearly identical log messages stating that 
> the punctuator does this and that. In order to make those messages "unique" 
> I'd like to include the partition number into them.
> Since I'm working with a single store here (which might be partitioned), I 
> assume that every single execution of the punctuator is bound to a single 
> partition of that store.
>  
> It would be cool if there were a method {{iterator.partition}} (or similar) 
> to get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #10134: TRIVIAL: fix JavaDocs formatting

2021-02-16 Thread GitBox


mjsax opened a new pull request #10134:
URL: https://github.com/apache/kafka/pull/10134


   Should be cherry-picked to `2.8` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-16 Thread GitBox


hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r577225891



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -79,67 +80,110 @@ public BatchAccumulator(
 }
 
 /**
- * Append a list of records into an atomic batch. We guarantee all records
- * are included in the same underlying record batch so that either all of
- * the records become committed or none of them do.
+ * Append a list of records into as many batches as necessary.
  *
- * @param epoch the expected leader epoch. If this does not match, then
- *  {@link Long#MAX_VALUE} will be returned as an offset which
- *  cannot become committed.
- * @param records the list of records to include in a batch
- * @return the expected offset of the last record (which will be
- * {@link Long#MAX_VALUE} if the epoch does not match), or null if
- * no memory could be allocated for the batch at this time
+ * The order of the elements in the records argument will match the order 
in the batches.
+ * This method will use as many batches as necessary to serialize all of 
the records. Since
+ * this method can split the records into multiple batches it is possible 
that some of the
+ * recors will get committed while other will not when the leader fails.
+ *
+ * @param epoch the expected leader epoch. If this does not match, then 
{@link Long#MAX_VALUE}
+ *  will be returned as an offset which cannot become committed
+ * @param records the list of records to include in the batches
+ * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the epoch does not
+ * match; null if no memory could be allocated for the batch at 
this time
+ * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
+ * batch size; if this exception is throw some of the elements in 
records may have
+ * been committed
  */
 public Long append(int epoch, List records) {
+return append(epoch, records, false);
+}
+
+/**
+ * Append a list of records into an atomic batch. We guarantee all records 
are included in the
+ * same underlying record batch so that either all of the records become 
committed or none of
+ * them do.
+ *
+ * @param epoch the expected leader epoch. If this does not match, then 
{@link Long#MAX_VALUE}
+ *  will be returned as an offset which cannot become committed
+ * @param records the list of records to include in a batch
+ * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the epoch does not
+ * match; null if no memory could be allocated for the batch at 
this time
+ * @throws RecordBatchTooLargeException if the size of the records is 
greater than the maximum
+ * batch size; if this exception is throw none of the elements in 
records were
+ * committed
+ */
+public Long appendAtomic(int epoch, List records) {
+return append(epoch, records, true);
+}
+
+private Long append(int epoch, List records, boolean isAtomic) {
 if (epoch != this.epoch) {
-// If the epoch does not match, then the state machine probably
-// has not gotten the notification about the latest epoch change.
-// In this case, ignore the append and return a large offset value
-// which will never be committed.
 return Long.MAX_VALUE;
 }
 
 ObjectSerializationCache serializationCache = new 
ObjectSerializationCache();
-int batchSize = 0;
-for (T record : records) {
-batchSize += serde.recordSize(record, serializationCache);
-}
-
-if (batchSize > maxBatchSize) {
-throw new IllegalArgumentException("The total size of " + records 
+ " is " + batchSize +
-", which exceeds the maximum allowed batch size of " + 
maxBatchSize);
-}
+int[] recordSizes = records
+.stream()
+.mapToInt(record -> serde.recordSize(record, serializationCache))
+.toArray();
 
 appendLock.lock();
 try {
 maybeCompleteDrain();
 
-BatchBuilder batch = maybeAllocateBatch(batchSize);
-if (batch == null) {
-return null;
-}
-
-// Restart the linger timer if necessary
-if (!lingerTimer.isRunning()) {
-lingerTimer.reset(time.milliseconds() + lingerMs);
+BatchBuilder batch = null;
+if (isAtomic) {
+batch = maybeAllocateBatch(recordSizes);
 }
 
 for (T record : records) {
+if (!isAtomic) {
+batch = 

[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-16 Thread GitBox


hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r577216920



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -77,6 +79,29 @@ default void handleResign() {}
  */
 void register(Listener listener);
 
+/**
+ * Append a list of records to the log. The write will be scheduled for 
some time
+ * in the future. There is no guarantee that appended records will be 
written to
+ * the log and eventually committed. While the order of the records is 
preserve, they can
+ * be appended to the log using one or more batches. This means that each 
record could
+ * be committed independently.

Review comment:
   It might already be clear enough given the previous sentence, but maybe 
we could emphasize that if any record becomes committed, then all records 
ordered before it are guaranteed to be committed as well.

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -79,67 +80,110 @@ public BatchAccumulator(
 }
 
 /**
- * Append a list of records into an atomic batch. We guarantee all records
- * are included in the same underlying record batch so that either all of
- * the records become committed or none of them do.
+ * Append a list of records into as many batches as necessary.
  *
- * @param epoch the expected leader epoch. If this does not match, then
- *  {@link Long#MAX_VALUE} will be returned as an offset which
- *  cannot become committed.
- * @param records the list of records to include in a batch
- * @return the expected offset of the last record (which will be
- * {@link Long#MAX_VALUE} if the epoch does not match), or null if
- * no memory could be allocated for the batch at this time
+ * The order of the elements in the records argument will match the order 
in the batches.
+ * This method will use as many batches as necessary to serialize all of 
the records. Since
+ * this method can split the records into multiple batches it is possible 
that some of the
+ * recors will get committed while other will not when the leader fails.
+ *
+ * @param epoch the expected leader epoch. If this does not match, then 
{@link Long#MAX_VALUE}
+ *  will be returned as an offset which cannot become committed
+ * @param records the list of records to include in the batches
+ * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the epoch does not
+ * match; null if no memory could be allocated for the batch at 
this time
+ * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
+ * batch size; if this exception is throw some of the elements in 
records may have
+ * been committed
  */
 public Long append(int epoch, List records) {
+return append(epoch, records, false);
+}
+
+/**
+ * Append a list of records into an atomic batch. We guarantee all records 
are included in the
+ * same underlying record batch so that either all of the records become 
committed or none of
+ * them do.
+ *
+ * @param epoch the expected leader epoch. If this does not match, then 
{@link Long#MAX_VALUE}
+ *  will be returned as an offset which cannot become committed
+ * @param records the list of records to include in a batch
+ * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the epoch does not
+ * match; null if no memory could be allocated for the batch at 
this time
+ * @throws RecordBatchTooLargeException if the size of the records is 
greater than the maximum
+ * batch size; if this exception is throw none of the elements in 
records were
+ * committed
+ */
+public Long appendAtomic(int epoch, List records) {
+return append(epoch, records, true);
+}
+
+private Long append(int epoch, List records, boolean isAtomic) {
 if (epoch != this.epoch) {
-// If the epoch does not match, then the state machine probably
-// has not gotten the notification about the latest epoch change.
-// In this case, ignore the append and return a large offset value
-// which will never be committed.
 return Long.MAX_VALUE;
 }
 
 ObjectSerializationCache serializationCache = new 
ObjectSerializationCache();
-int batchSize = 0;
-for (T record : records) {
-batchSize += serde.recordSize(record, serializationCache);
-}
-
-if (batchSize > maxBatchSize) {
-throw new IllegalArgumentException("The total size of " + records 
+ " is " + batchSize +
-", which exceeds the maximum allowed batch size of " + 
maxBatchSize);

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577217605



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577214559



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {
+class ReadyBrokersFuture {
+private final CompletableFuture future;
+private final int minBrokers;
+
+ReadyBrokersFuture(CompletableFuture future, int minBrokers) {
+this.future = future;
+this.minBrokers = minBrokers;
+}
+
+boolean check() {
+int numUnfenced = 0;
+for (BrokerRegistration registration : 
brokerRegistrations.values()) {
+if (!registration.fenced()) {
+numUnfenced++;
+}
+if (numUnfenced >= minBrokers) {
+return true;
+}
+}
+return false;
+}
+}
+
+/**
+ * The SLF4J log context.
+ */
+private final LogContext logContext;
+
+/**
+ * The SLF4J log object.
+ */
+private final Logger log;
+
+/**
+ * The Kafka clock object to use.
+ */
+private final Time time;
+
+/**
+ * How long sessions should last, in nanoseconds.
+ */
+private final long sessionTimeoutNs;
+
+/**
+ * The replica placement policy to use.
+ */
+private final ReplicaPlacementPolicy placementPolicy;
+
+/**
+ * Maps broker IDs to broker registrations.
+ */
+private final TimelineHashMap 
brokerRegistrations;
+
+/**
+ * The broker heartbeat manager, or null if this controller is on standby.
+ */
+private BrokerHeartbeatManager heartbeatManager;
+
+/**
+ * A future which is completed as soon as we have the given number of 
brokers
+ * ready.
+ */
+private Optional readyBrokersFuture;
+
+ClusterControlManager(LogContext logContext,
+  Time time,
+  SnapshotRegistry snapshotRegistry,
+  long sessionTimeoutNs,
+  ReplicaPlacementPolicy placementPolicy) {
+this.logContext = logContext;
+this.log = logContext.logger(ClusterControlManager.class);
+this.time = time;
+this.sessionTimeoutNs = sessionTimeoutNs;
+this.placementPolicy = placementPolicy;
+this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+this.heartbeatManager = null;
+this.readyBrokersFuture = Optional.empty();
+}
+
+/**
+ * Transition 

[GitHub] [kafka] hachikuji merged pull request #10133: MINOR: Update raft README and add a more specific error message.

2021-02-16 Thread GitBox


hachikuji merged pull request #10133:
URL: https://github.com/apache/kafka/pull/10133


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12241) Partition offline when ISR shrinks to leader and LogDir goes offline

2021-02-16 Thread Ajay Patel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285537#comment-17285537
 ] 

Ajay Patel commented on KAFKA-12241:


We have a different trigger for the ISR shrinkage, but the same symptoms. We 
also prefer not to enable unclean leader election, as it carries the risk of 
electing a broker which has not yet met the highWatermark for that partition. 
Electing a broker that was removed from the ISR at the highWatermark without 
having to enable unclean leader election would be a great way to allow both a 
seamless failover and no loss of data.

> Partition offline when ISR shrinks to leader and LogDir goes offline
> 
>
> Key: KAFKA-12241
> URL: https://issues.apache.org/jira/browse/KAFKA-12241
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.2
>Reporter: Noa Resare
>Priority: Major
>
> This is a long standing issue that we haven't previously tracked in a JIRA. 
> We experience this maybe once per month on average and we see the following 
> sequence of events:
>  # A broker shrinks ISR to just itself for a partition. However, the 
> followers are at highWatermark:{{ [Partition PARTITION broker=601] Shrinking 
> ISR from 1501,601,1201,1801 to 601. Leader: (highWatermark: 432385279, 
> endOffset: 432385280). Out of sync replicas: (brokerId: 1501, endOffset: 
> 432385279) (brokerId: 1201, endOffset: 432385279) (brokerId: 1801, endOffset: 
> 432385279).}}
>  # Around this time (in the case I have in front of me, 20ms earlier 
> according to the logging subsystem) LogDirFailureChannel captures an Error 
> while appending records to PARTITION due to a readonly filesystem.
>  # ~20 ms after the ISR shrink, LogDirFailureHandler offlines the partition: 
> Logs for partitions LIST_OF_PARTITIONS are offline and logs for future 
> partitions are offline due to failure on log directory /kafka/d6/data 
>  # ~50ms later the controller marks the replicas as offline from 601: 
> message: [Controller id=901] Mark replicas LIST_OF_PARTITIONS on broker 601 
> as offline 
>  # ~2ms later the controller offlines the partition: [Controller id=901 
> epoch=4] Changed partition PARTITION state from OnlinePartition to 
> OfflinePartition 
> To resolve this someone needs to manually enable unclean leader election, 
> which is obviously not ideal. Since the leader knows that all the followers 
> that are removed from ISR is at highWatermark, maybe it could convey that to 
> the controller in the LeaderAndIsr response so that the controller could pick 
> a new leader without having to resort to unclean leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577188202



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577187004



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {
+class ReadyBrokersFuture {
+private final CompletableFuture future;
+private final int minBrokers;
+
+ReadyBrokersFuture(CompletableFuture future, int minBrokers) {
+this.future = future;
+this.minBrokers = minBrokers;
+}
+
+boolean check() {
+int numUnfenced = 0;
+for (BrokerRegistration registration : 
brokerRegistrations.values()) {
+if (!registration.fenced()) {
+numUnfenced++;
+}
+if (numUnfenced >= minBrokers) {
+return true;
+}
+}
+return false;
+}
+}
+
+/**
+ * The SLF4J log context.
+ */
+private final LogContext logContext;
+
+/**
+ * The SLF4J log object.
+ */
+private final Logger log;
+
+/**
+ * The Kafka clock object to use.
+ */
+private final Time time;
+
+/**
+ * How long sessions should last, in nanoseconds.
+ */
+private final long sessionTimeoutNs;
+
+/**
+ * The replica placement policy to use.
+ */
+private final ReplicaPlacementPolicy placementPolicy;
+
+/**
+ * Maps broker IDs to broker registrations.
+ */
+private final TimelineHashMap 
brokerRegistrations;
+
+/**
+ * The broker heartbeat manager, or null if this controller is on standby.
+ */
+private BrokerHeartbeatManager heartbeatManager;
+
+/**
+ * A future which is completed as soon as we have the given number of 
brokers
+ * ready.
+ */
+private Optional readyBrokersFuture;
+
+ClusterControlManager(LogContext logContext,
+  Time time,
+  SnapshotRegistry snapshotRegistry,
+  long sessionTimeoutNs,
+  ReplicaPlacementPolicy placementPolicy) {
+this.logContext = logContext;
+this.log = logContext.logger(ClusterControlManager.class);
+this.time = time;
+this.sessionTimeoutNs = sessionTimeoutNs;
+this.placementPolicy = placementPolicy;
+this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+this.heartbeatManager = null;
+this.readyBrokersFuture = Optional.empty();
+}
+
+/**
+ * Transition 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577187004



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {
+class ReadyBrokersFuture {
+private final CompletableFuture future;
+private final int minBrokers;
+
+ReadyBrokersFuture(CompletableFuture future, int minBrokers) {
+this.future = future;
+this.minBrokers = minBrokers;
+}
+
+boolean check() {
+int numUnfenced = 0;
+for (BrokerRegistration registration : 
brokerRegistrations.values()) {
+if (!registration.fenced()) {
+numUnfenced++;
+}
+if (numUnfenced >= minBrokers) {
+return true;
+}
+}
+return false;
+}
+}
+
+/**
+ * The SLF4J log context.
+ */
+private final LogContext logContext;
+
+/**
+ * The SLF4J log object.
+ */
+private final Logger log;
+
+/**
+ * The Kafka clock object to use.
+ */
+private final Time time;
+
+/**
+ * How long sessions should last, in nanoseconds.
+ */
+private final long sessionTimeoutNs;
+
+/**
+ * The replica placement policy to use.
+ */
+private final ReplicaPlacementPolicy placementPolicy;
+
+/**
+ * Maps broker IDs to broker registrations.
+ */
+private final TimelineHashMap 
brokerRegistrations;
+
+/**
+ * The broker heartbeat manager, or null if this controller is on standby.
+ */
+private BrokerHeartbeatManager heartbeatManager;
+
+/**
+ * A future which is completed as soon as we have the given number of 
brokers
+ * ready.
+ */
+private Optional readyBrokersFuture;
+
+ClusterControlManager(LogContext logContext,
+  Time time,
+  SnapshotRegistry snapshotRegistry,
+  long sessionTimeoutNs,
+  ReplicaPlacementPolicy placementPolicy) {
+this.logContext = logContext;
+this.log = logContext.logger(ClusterControlManager.class);
+this.time = time;
+this.sessionTimeoutNs = sessionTimeoutNs;
+this.placementPolicy = placementPolicy;
+this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+this.heartbeatManager = null;
+this.readyBrokersFuture = Optional.empty();
+}
+
+/**
+ * Transition 

[jira] [Updated] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-02-16 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-12330:
-
Description: 
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions even if the fetch offset is lower than the fetch upper bound. 
However, the fetch response will still contain updates to metadata such as hwm 
if that metadata has changed. This can lead to degenerate behavior where a 
partition's hwm or log start offset is updated resulting in the next fetch 
being unnecessarily skipped for that partition. At first this appeared to be 
worse, as hwm updates occur frequently, but starvation should result in hwm 
movement becoming blocked, allowing a fetch to go through and then becoming 
unstuck. However, it'll still require one more fetch request than necessary to 
do so. Consumers may be affected more than replica fetchers, however they often 
remove partitions with fetched data from the next fetch request and this may be 
helping prevent starvation.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Example POC change:
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if (updateFetchContextAndRemoveUnselected) {
  iter.remove()
}
  }
}
nextElement != null
  }{noformat}
 

  was:
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions even if the fetch offset is lower than the fetch upper bound. 
However, the fetch response will still contain updates to metadata such as hwm 
if that metadata has changed. This can lead to degenerate behavior where a 
partition's hwm or log start offset is updated resulting in the next fetch 
being unnecessarily skipped for that partition. At first this appeared to be 
worse, as hwm updates occur frequently, but starvation should result in hwm 
movement becoming blocked, allowing a fetch to go through and then becoming 
unstuck. However, it'll still require one more fetch request than necessary to 
do so. Consumers may be affected more than replica fetchers, however they often 
remove partitions with fetched data from the next fetch request and this may be 
helping prevent starvation.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577178197



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {
+class ReadyBrokersFuture {
+private final CompletableFuture future;
+private final int minBrokers;
+
+ReadyBrokersFuture(CompletableFuture future, int minBrokers) {
+this.future = future;
+this.minBrokers = minBrokers;
+}
+
+boolean check() {
+int numUnfenced = 0;
+for (BrokerRegistration registration : 
brokerRegistrations.values()) {
+if (!registration.fenced()) {
+numUnfenced++;
+}
+if (numUnfenced >= minBrokers) {
+return true;
+}
+}
+return false;
+}
+}
+
+/**
+ * The SLF4J log context.
+ */
+private final LogContext logContext;
+
+/**
+ * The SLF4J log object.
+ */
+private final Logger log;
+
+/**
+ * The Kafka clock object to use.
+ */
+private final Time time;
+
+/**
+ * How long sessions should last, in nanoseconds.
+ */
+private final long sessionTimeoutNs;
+
+/**
+ * The replica placement policy to use.
+ */
+private final ReplicaPlacementPolicy placementPolicy;
+
+/**
+ * Maps broker IDs to broker registrations.
+ */
+private final TimelineHashMap 
brokerRegistrations;
+
+/**
+ * The broker heartbeat manager, or null if this controller is on standby.
+ */
+private BrokerHeartbeatManager heartbeatManager;
+
+/**
+ * A future which is completed as soon as we have the given number of 
brokers
+ * ready.
+ */
+private Optional readyBrokersFuture;
+
+ClusterControlManager(LogContext logContext,
+  Time time,
+  SnapshotRegistry snapshotRegistry,
+  long sessionTimeoutNs,
+  ReplicaPlacementPolicy placementPolicy) {
+this.logContext = logContext;
+this.log = logContext.logger(ClusterControlManager.class);
+this.time = time;
+this.sessionTimeoutNs = sessionTimeoutNs;
+this.placementPolicy = placementPolicy;
+this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+this.heartbeatManager = null;
+this.readyBrokersFuture = Optional.empty();
+}
+
+/**
+ * Transition 

[jira] [Updated] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-02-16 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-12330:
-
Description: 
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions even if the fetch offset is lower than the fetch upper bound. 
However, the fetch response will still contain updates to metadata such as hwm 
if that metadata has changed. This can lead to degenerate behavior where a 
partition's hwm or log start offset is updated resulting in the next fetch 
being unnecessarily skipped for that partition. At first this appeared to be 
worse, as hwm updates occur frequently, but starvation should result in hwm 
movement becoming blocked, allowing a fetch to go through and then becoming 
unstuck. However, it'll still require one more fetch request than necessary to 
do so. Consumers may be affected more than replica fetchers, however they often 
remove partitions with fetched data from the next fetch request and this may be 
helping prevent starvation.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if (updateFetchContextAndRemoveUnselected) {
  iter.remove()
}
  }
}
nextElement != null
  }{noformat}
 

  was:
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so. Consumers may be affected more than 
replica fetchers, however they often remove partitions with fetched data from 
the next fetch request and this may be helping prevent starvation.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && 

[jira] [Updated] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-02-16 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-12330:
-
Description: 
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so. Consumers may be affected more than 
replica fetchers, however they often remove partitions with fetched data from 
the next fetch request and this may be helping prevent starvation.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if (updateFetchContextAndRemoveUnselected) {
  iter.remove()
}
  }
}
nextElement != null
  }{noformat}
 

  was:
The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.

 
{code:java}
 {code}
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577176042



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577176042



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[jira] [Created] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-02-16 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12330:


 Summary: FetchSessionCache may cause starvation for partitions 
when FetchResponse is full
 Key: KAFKA-12330
 URL: https://issues.apache.org/jira/browse/KAFKA-12330
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.

 
{code:java}
 {code}
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if (updateFetchContextAndRemoveUnselected) {
  iter.remove()
}
  }
}
nextElement != null
  }{noformat}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577167941



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+
+public class ConfigurationControlManager {
+private final Logger log;
+private final SnapshotRegistry snapshotRegistry;
+private final Map configDefs;
+private final TimelineHashMap> configData;
+
+ConfigurationControlManager(LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+Map 
configDefs) {
+this.log = logContext.logger(ConfigurationControlManager.class);
+this.snapshotRegistry = snapshotRegistry;
+this.configDefs = configDefs;
+this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+
+/**
+ * Determine the result of applying a batch of incremental configuration 
changes.  Note
+ * that this method does not change the contents of memory.  It just 
generates a
+ * result, that you can replay later if you wish using replay().
+ *
+ * Note that there can only be one result per ConfigResource.  So if you 
try to modify
+ * several keys and one modification fails, the whole ConfigKey fails and 
nothing gets
+ * changed.
+ *
+ * @param configChanges Maps each resource to a map from config keys to
+ *  operation data.
+ * @return  The result.
+ */
+ControllerResult> incrementalAlterConfigs(
+Map>> 
configChanges) {
+List outputRecords = new ArrayList<>();
+Map outputResults = new HashMap<>();
+for (Entry>> 
resourceEntry :
+configChanges.entrySet()) {
+incrementalAlterConfigResource(resourceEntry.getKey(),
+resourceEntry.getValue(),
+outputRecords,
+outputResults);
+}
+return new ControllerResult<>(outputRecords, outputResults);
+}
+
+private void incrementalAlterConfigResource(ConfigResource configResource,
+Map> keysToOps,
+List 
outputRecords,
+Map 
outputResults) {
+ApiError error = checkConfigResource(configResource);
+if (error.isFailure()) {
+outputResults.put(configResource, error);
+return;
+}
+List newRecords = new ArrayList<>();
+for (Entry> keysToOpsEntry : 
keysToOps.entrySet()) {
+String key = keysToOpsEntry.getKey();
+String currentValue = null;
+TimelineHashMap currentConfigs = 
configData.get(configResource);
+if (currentConfigs != null) {
+currentValue = currentConfigs.get(key);
+}
+String newValue = currentValue;
+Entry opTypeAndNewValue = 
keysToOpsEntry.getValue();
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577167409



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577163457



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] rondagostino commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


rondagostino commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577148212



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577117523



##
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of 
memory.

Review comment:
   Yes, it is.  I will move it to the test directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577117523



##
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of 
memory.

Review comment:
   Yes, it is is.  I will move it to the test directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577148107



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577143791



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577146421



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+
+public class ClusterControlManager {
+class ReadyBrokersFuture {
+private final CompletableFuture future;
+private final int minBrokers;
+
+ReadyBrokersFuture(CompletableFuture future, int minBrokers) {
+this.future = future;
+this.minBrokers = minBrokers;
+}
+
+boolean check() {
+int numUnfenced = 0;
+for (BrokerRegistration registration : 
brokerRegistrations.values()) {
+if (!registration.fenced()) {
+numUnfenced++;
+}
+if (numUnfenced >= minBrokers) {
+return true;
+}
+}
+return false;
+}
+}
+
+/**
+ * The SLF4J log context.
+ */
+private final LogContext logContext;
+
+/**
+ * The SLF4J log object.
+ */
+private final Logger log;
+
+/**
+ * The Kafka clock object to use.
+ */
+private final Time time;
+
+/**
+ * How long sessions should last, in nanoseconds.
+ */
+private final long sessionTimeoutNs;
+
+/**
+ * The replica placement policy to use.
+ */
+private final ReplicaPlacementPolicy placementPolicy;
+
+/**
+ * Maps broker IDs to broker registrations.
+ */
+private final TimelineHashMap 
brokerRegistrations;
+
+/**
+ * The broker heartbeat manager, or null if this controller is on standby.
+ */
+private BrokerHeartbeatManager heartbeatManager;
+
+/**
+ * A future which is completed as soon as we have the given number of 
brokers
+ * ready.
+ */
+private Optional readyBrokersFuture;
+
+ClusterControlManager(LogContext logContext,
+  Time time,
+  SnapshotRegistry snapshotRegistry,
+  long sessionTimeoutNs,
+  ReplicaPlacementPolicy placementPolicy) {
+this.logContext = logContext;
+this.log = logContext.logger(ClusterControlManager.class);
+this.time = time;
+this.sessionTimeoutNs = sessionTimeoutNs;
+this.placementPolicy = placementPolicy;
+this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+this.heartbeatManager = null;
+this.readyBrokersFuture = Optional.empty();
+}
+
+/**
+ * Transition 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577143791



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] jolshan opened a new pull request #10133: MINOR: Update raft README and add a more specific error message.

2021-02-16 Thread GitBox


jolshan opened a new pull request #10133:
URL: https://github.com/apache/kafka/pull/10133


   `test-raft-server-start.sh` requires the config to be specified with 
`--config`. I've included this in the README and added an error message for 
this specific case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-02-16 Thread GitBox


ableegoldman commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r577140223



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
 assertThat(store.get(0), nullValue());
 }
+
+
+@Test
+public void shouldReturnKeysWithGivenPrefix(){
+store = createKeyValueStore(driver.context());
+final String value = "value";
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(1, value));
+entries.add(new KeyValue<>(2, value));
+entries.add(new KeyValue<>(11, value));
+entries.add(new KeyValue<>(13, value));
+
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan(1, new IntegerSerializer());

Review comment:
   Yeah, the underlying store compares the serializer bytes 
lexicographically, it doesn't have any concept of "Integer" or any other type. 
And the really tricky thing is that it scans lexicographically, which means 
from left to right, whereas when we serialize things we usually do so from 
right to left. eg `2` in binary is `10` whereas 11 in binary is `1011` and 13 
is `1101`. 
   The problem here is that the serialized version of 2 is a different number 
of bytes than the serialized form of 11/13, so the lexicographical comparator 
is effectively comparing digits of a different magnitude. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577136289



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577135430



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577127231



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+public class ReplicationControlManager {
+static class TopicControlInfo {
+private final Uuid id;
+private final TimelineHashMap parts;
+
+TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+this.id = id;
+this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+}
+
+static class PartitionControlInfo {
+private final int[] replicas;
+private final int[] isr;
+private final int[] removingReplicas;
+private final int[] addingReplicas;
+private final int leader;
+private final int leaderEpoch;
+private final int partitionEpoch;
+
+PartitionControlInfo(PartitionRecord record) {
+this(Replicas.toArray(record.replicas()),
+Replicas.toArray(record.isr()),
+Replicas.toArray(record.removingReplicas()),
+Replicas.toArray(record.addingReplicas()),
+record.leader(),
+record.leaderEpoch(),
+record.partitionEpoch());
+}
+
+PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+ int[] addingReplicas, int leader, int leaderEpoch,
+ int partitionEpoch) {
+this.replicas = replicas;
+this.isr = isr;
+

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577125115



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+/**
+ * A builder class which creates the QuorumController.
+ */
+static public class Builder {
+private final int nodeId;
+private Time time = Time.SYSTEM;
+private String threadNamePrefix = null;
+private LogContext logContext = null;
+private Map configDefs = 
Collections.emptyMap();
+private MetaLogManager logManager = null;
+private Map supportedFeatures = 
Collections.emptyMap();
+private short defaultReplicationFactor = 3;
+private int defaultNumPartitions = 1;
+private ReplicaPlacementPolicy replicaPlacementPolicy =
+new SimpleReplicaPlacementPolicy(new Random());
+private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+public Builder(int nodeId) {
+this.nodeId = nodeId;
+}
+
+public 

[GitHub] [kafka] cmccabe commented on a change in pull request #10070: KAFKA-12276: Add the quorum controller code

2021-02-16 Thread GitBox


cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577117523



##
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of 
memory.

Review comment:
   Yes.  I have moved it to the test directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn commented on pull request #9670: DOCS-6076: Clarify config names for EOS versions 1 and 2

2021-02-16 Thread GitBox


JimGalasyn commented on pull request #9670:
URL: https://github.com/apache/kafka/pull/9670#issuecomment-780061593


   @abbccdda @mjsax Is this ready to merge?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partations

2021-02-16 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285383#comment-17285383
 ] 

Jason Gustafson edited comment on KAFKA-10888 at 2/16/21, 6:09 PM:
---

We have found one cause of imbalance when the sticky partitioner is used. 
Basically the intuition behind the sticky partitioner breaks down a little bit 
when a small `linger.ms` is in use (sadly this is the default). The user is 
opting out of batching with this setting which means there is often little 
opportunity to fill batches before they get drained and sent. That leaves the 
door open to sustained imbalance in some cases.

To see why, suppose that we have a producer writing to 3 partitions with 
linger.ms=0 and one partition slows down a little bit for some reason. It could 
be a leader change or some transient network issue. The producer will have to 
hold onto the batches for that partition until it becomes available. While it 
is holding onto those batches, additional batches will begin piling up. Each of 
these batches is likely to get filled because the producer is not ready to send 
to this partition yet.

Consider this from the perspective of the sticky partitioner. Every time the 
slow partition gets selected, the producer will fill the batches completely. On 
the other hand, the remaining "fast" partitions will likely not get their 
batches filled because of the `linger.ms=0` setting. As soon as a single record 
is available, it might get sent. So more data ends up getting written to the 
partition that has already started to build a backlog. And even after the cause 
of the original slowness (e.g. leader change) gets resolved, it might take some 
time for this imbalance to recover. We believe this can even create a runaway 
effect if the partition cannot catch up with the handicap of the additional 
load.

We analyzed one case where we thought this might be going on. Below I've 
summarized the writes over a period of one hour to 3 partitions. Partition 0 
here is the "slow" partition. All partitions get roughly the same number of 
batches, but the slow partition has much bigger batch sizes.

{code}
Partition TotalBatches TotalBytes TotalRecords BytesPerBatch RecordsPerBatch
0 1683 25953200   2522815420.80  14.99
1 1713 78368784622 4574.94   2.70
2 1711 75462124381 4410.41   2.56
{code}

After restarting the application, the producer was healthy again. It just was 
not able to recover with the imbalanced workload.


was (Author: hachikuji):
We have found one cause of imbalance when the sticky partitioner is used. 
Basically the intuition behind the sticky partitioner breaks down a little bit 
when a small `linger.ms` is in use (sadly this is the default). The user is 
sort of opting out of batching with this setting which means there is little 
opportunity to fill batches before they get drained and sent. That leaves the 
door open for a kind of imbalanced write problem.

To see why, suppose that we have a producer writing to 3 partitions with 
linger.ms=0 and one partition slows down a little bit for some reason. It could 
be a leader change or some transient network issue. The producer will have to 
hold onto the batches for that partition until it becomes available. While it 
is holding onto those batches, additional batches will begin piling up. Each of 
these batches is likely to get filled because the producer is not ready to send 
to this partition yet.

Consider this from the perspective of the sticky partitioner. Every time the 
slow partition gets selected, the producer will fill the batches completely. On 
the other hand, the remaining "fast" partitions will likely not get their 
batches filled because of the `linger.ms=0` setting. As soon as a single record 
is available, it might get sent. So more data ends up getting written to the 
partition that has already started to build a backlog. And even after the cause 
of the original slowness (e.g. leader change) gets resolved, it might take some 
time for this imbalance to recover. We believe this can even create a runaway 
effect if the partition cannot catch up with the handicap of the additional 
load.

We analyzed one case where we thought this might be going on. Below I've 
summarized the writes over a period of one hour to 3 partitions. Partition 0 
here is the "slow" partition. All partitions get roughly the same number of 
batches, but the slow partition has much bigger batch sizes.

{code}
Partition TotalBatches TotalBytes TotalRecords BytesPerBatch RecordsPerBatch
0 1683 25953200   2522815420.80  14.99
1 1713 78368784622 4574.94   2.70
2 1711 75462124381 4410.41   2.56
{code}

After restarting the application, the producer was 

[jira] [Commented] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partations

2021-02-16 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285383#comment-17285383
 ] 

Jason Gustafson commented on KAFKA-10888:
-

We have found one cause of imbalance when the sticky partitioner is used. 
Basically the intuition behind the sticky partitioner breaks down a little bit 
when a small `linger.ms` is in use (sadly this is the default). The user is 
sort of opting out of batching with this setting which means there is little 
opportunity to fill batches before they get drained and sent. That leaves the 
door open for a kind of imbalanced write problem.

To see why, suppose that we have a producer writing to 3 partitions with 
linger.ms=0 and one partition slows down a little bit for some reason. It could 
be a leader change or some transient network issue. The producer will have to 
hold onto the batches for that partition until it becomes available. While it 
is holding onto those batches, additional batches will begin piling up. Each of 
these batches is likely to get filled because the producer is not ready to send 
to this partition yet.

Consider this from the perspective of the sticky partitioner. Every time the 
slow partition gets selected, the producer will fill the batches completely. On 
the other hand, the remaining "fast" partitions will likely not get their 
batches filled because of the `linger.ms=0` setting. As soon as a single record 
is available, it might get sent. So more data ends up getting written to the 
partition that has already started to build a backlog. And even after the cause 
of the original slowness (e.g. leader change) gets resolved, it might take some 
time for this imbalance to recover. We believe this can even create a runaway 
effect if the partition cannot catch up with the handicap of the additional 
load.

We analyzed one case where we thought this might be going on. Below I've 
summarized the writes over a period of one hour to 3 partitions. Partition 0 
here is the "slow" partition. All partitions get roughly the same number of 
batches, but the slow partition has much bigger batch sizes.

{code}
Partition TotalBatches TotalBytes TotalRecords BytesPerBatch RecordsPerBatch
0 1683 25953200   2522815420.80  14.99
1 1713 78368784622 4574.94   2.70
2 1711 75462124381 4410.41   2.56
{code}

After restarting the application, the producer was healthy again. It just was 
not able to recover with the imbalanced workload.

>  Sticky partition leads to uneven product msg, resulting in abnormal delays 
> in some partations
> --
>
> Key: KAFKA-10888
> URL: https://issues.apache.org/jira/browse/KAFKA-10888
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: jr
>Priority: Major
> Attachments: image-2020-12-24-21-05-02-800.png, 
> image-2020-12-24-21-09-47-692.png, image-2020-12-24-21-10-24-407.png
>
>
>   110 producers ,550 partitions ,550 consumers , 5 nodes Kafka cluster
>   The producer uses the nullkey+stick partitioner, the total production rate 
> is about 100w tps
> Observed partition delay is abnormal and message distribution is uneven, 
> which leads to the maximum production and consumption delay of the partition 
> with more messages 
> abnormal.
>   I cannot find reason that stick will make the message distribution uneven 
> at this production rate.
>   I can't switch to the round-robin partitioner, which will increase the 
> delay and cpu cost. Is thathe stick partationer design cause uneven message 
> distribution, or this is abnormal. How to solve it?
>   !image-2020-12-24-21-09-47-692.png!
> As shown in the picture, the uneven distribution is concentrated on some 
> partitions and some brokers, there seems to be some rules.
> This problem does not only occur in one cluster, but in many high tps 
> clusters,
> The problem is more obvious on the test cluster we built.
> !image-2020-12-24-21-10-24-407.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs

2021-02-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12313:
--
Fix Version/s: 3.0.0

> Consider deprecating the default.windowed.serde.inner.class configs
> ---
>
> Key: KAFKA-12313
> URL: https://issues.apache.org/jira/browse/KAFKA-12313
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> During the discussion of KIP-659 we discussed whether it made sense to have a 
> "default" class for the serdes of windowed inner classes across Streams. 
> Using these configs instead of specifying an actual Serde object can lead to 
> subtle bugs, since the WindowedDeserializer requires a windowSize in addition 
> to the inner class. If the default constructor is invoked, as it will be when 
> falling back on the config, this windowSize defaults to MAX_VALUE. 
> If the downstream program doesn't care about the window end time in the 
> output, then this can go unnoticed and technically there is no problem. But 
> if anything does depend on the end time, or the user just wants to manually 
> read the output for testing purposes, then the MAX_VALUE will result in a 
> garbage timestamp.
> We should consider whether the convenience of specifying a config instead of 
> instantiating a Serde in each operator is really worth the risk of a user 
> accidentally failing to specify a windowSize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] purplefox opened a new pull request #10132: Fix ssl close

2021-02-16 Thread GitBox


purplefox opened a new pull request #10132:
URL: https://github.com/apache/kafka/pull/10132


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12329) kafka-reassign-partitions command should give a better error message when a topic does not exist

2021-02-16 Thread David Jacot (Jira)
David Jacot created KAFKA-12329:
---

 Summary: kafka-reassign-partitions command should give a better 
error message when a topic does not exist
 Key: KAFKA-12329
 URL: https://issues.apache.org/jira/browse/KAFKA-12329
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


The `kafka-reassign-partitions` command spits out a generic when the 
reassignment contains a topic which does not exist:

{noformat}
$ ./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--execute --reassignment-json-file reassignment.json
Error: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
server does not host this topic-partition.
{noformat}

When the reassignment contains multiple topic-partitions, this is quite 
annoying. It would be better if it could at least give the concerned 
topic-partition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10128: MINOR: remove duplicate code of serializing auto-generated data

2021-02-16 Thread GitBox


chia7712 merged pull request #10128:
URL: https://github.com/apache/kafka/pull/10128


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10024: KAFKA-12273 InterBrokerSendThread#pollOnce throws FatalExitError even…

2021-02-16 Thread GitBox


chia7712 commented on a change in pull request #10024:
URL: https://github.com/apache/kafka/pull/10024#discussion_r576927931



##
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##
@@ -78,6 +77,9 @@ abstract class InterBrokerSendThread(
   failExpiredRequests(now)
   unsentRequests.clean()
 } catch {
+  case _: DisconnectException if isShutdownInitiated =>
+// DisconnectException is caused by NetworkClient#ensureActive
+// this thread is closing so this error is acceptable

Review comment:
   @dajac Could you take a look at above comment? thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10074: KAFKA-12305: Fix Flatten SMT for array types

2021-02-16 Thread GitBox


C0urante commented on pull request #10074:
URL: https://github.com/apache/kafka/pull/10074#issuecomment-779866655


   @gharris1727 @ncliang @tombentley anyone got time to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10073: KAFKA-12303: Fix handling of null values by Flatten SMT

2021-02-16 Thread GitBox


C0urante commented on pull request #10073:
URL: https://github.com/apache/kafka/pull/10073#issuecomment-779866596


   @gharris1727 @ncliang @tombentley anyone got time to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-02-16 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285225#comment-17285225
 ] 

Marco Lotz commented on KAFKA-10383:


[~vvcephei] I have sent the email to Kafka's dev list requesting privileges to 
create KIP last Thursday. It is still not granted yet. Was there the right 
place to request the access to create the KIP?

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Assignee: Marco Lotz
>Priority: Major
>  Labels: needs-kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285220#comment-17285220
 ] 

Ismael Juma commented on KAFKA-3702:


[~purplefox] it may be simplest if you submit a PR with the changes you think 
are required. :)

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285218#comment-17285218
 ] 

Ismael Juma commented on KAFKA-3702:


Also `isOutboundDone` says:
{quote}Returns whether wrap(ByteBuffer, ByteBuffer) will produce any more 
outbound data messages.
Note that during the closure phase, a SSLEngine may generate handshake closure 
data that must be sent to the peer. wrap() must be called to generate this 
data. When this method returns true, no more outbound data will be created.
{quote}

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285216#comment-17285216
 ] 

Tim Fox commented on KAFKA-3702:


>> A possible issue (identified by [~david.mao]) is that we don't check if 
>>`isOutboundDone` is true in the `close` method.

Yes, I think this is also an issue (perhaps a separate one). According to the 
SSLEngine javadoc, wrap() or isOutboundDone() should be called _repeatedly_ 
until it returns CLOSED/true. Currently we only call it once and assume it will 
immediately return with CLOSED/true.

 

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285215#comment-17285215
 ] 

Ismael Juma commented on KAFKA-3702:


[~purplefox] The documentation for `closeOutbound` says you must still call 
`wrap` after:
{quote}wrap(ByteBuffer, ByteBuffer) should be called to flush any remaining 
handshake data.
{quote}

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285212#comment-17285212
 ] 

Tim Fox commented on KAFKA-3702:


Maybe I am missing something but writing to the connection (via flush) after 
closeOutbound has been called seems wrong to me.

closeOutBound cause close_notify to be sent to the peer. AIUI, you shouldn't 
wrote more data on a connection after close_notify has been sent.

When the peer receives the close_notify it might close the connection. If we 
then continue to send more data it's likely that we might receive "connection 
reset by peer" or similar as the connection will already have been closed (I 
believe this is what we see).

 

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] MarcoLotz opened a new pull request #10131: KAFKA-5146 / move kafka-streams example to a new module

2021-02-16 Thread GitBox


MarcoLotz opened a new pull request #10131:
URL: https://github.com/apache/kafka/pull/10131


   Moved "streams-examples" to its own module outside kafka-streams module.
   Because of org.apache.kafka.streams.processor.internals.StateDirectory in 
kafka-streams module, I had to add the jackson binder dependency. Before the 
change, It was probably being retrieved as a transitive dependency through 
"connect-json" dependency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285199#comment-17285199
 ] 

Ismael Juma commented on KAFKA-3702:


A possible issue (identified by [~david.mao]) is that we don't check if 
`isOutboundDone` is true in the `close` method.

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5649) Producer is being closed generating ssl exception

2021-02-16 Thread Julian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285170#comment-17285170
 ] 

Julian edited comment on KAFKA-5649 at 2/16/21, 12:45 PM:
--

We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702  and 
https://issues.apache.org/jira/browse/KAFKA-5649 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.


was (Author: julescs0):
We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.

> Producer is being closed generating ssl exception
> -
>
> Key: KAFKA-5649
> URL: https://issues.apache.org/jira/browse/KAFKA-5649
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.1
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am 
> getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that 
> step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at 

[jira] [Comment Edited] (KAFKA-5649) Producer is being closed generating ssl exception

2021-02-16 Thread Julian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285170#comment-17285170
 ] 

Julian edited comment on KAFKA-5649 at 2/16/21, 12:45 PM:
--

We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.


was (Author: julescs0):
We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702  and 
https://issues.apache.org/jira/browse/KAFKA-5649 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.

> Producer is being closed generating ssl exception
> -
>
> Key: KAFKA-5649
> URL: https://issues.apache.org/jira/browse/KAFKA-5649
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.1
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am 
> getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that 
> step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at 

[GitHub] [kafka] cadonna commented on a change in pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-02-16 Thread GitBox


cadonna commented on a change in pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#discussion_r576789945



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
##
@@ -60,4 +67,22 @@ public void shouldRemoveKeysWithNullValues() {
 
 assertThat(store.get(0), nullValue());
 }
+
+
+@Test
+public void shouldReturnKeysWithGivenPrefix(){
+store = createKeyValueStore(driver.context());
+final String value = "value";
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(1, value));
+entries.add(new KeyValue<>(2, value));
+entries.add(new KeyValue<>(11, value));
+entries.add(new KeyValue<>(13, value));
+
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan(1, new IntegerSerializer());

Review comment:
   The reason, we get only `1` when we scan for prefix `1` is that the 
integer serializer serializes `11` and `13` in the least significant byte 
instead of serializing `1` in the byte before the least significant byte and 
`1` and `3` in the least significant byte. With the former the **byte** 
lexicographical order of `1 2 11 13` would be `1 2 11 13` which corresponds to 
the natural order of integers. With the latter the **byte** lexicographical 
order of `1 2 11 13` would be `1 11 13 2` which corresponds to the string 
lexicographical order. So the serializer determines the order of the entries 
and the store always returns the entries in byte lexicographical order.
   
   You will experience a similar when you call `range(-1, 2)` on the in-memory 
state store in the unit test. You will get back an empty result since `-1` is 
larger then `2` in byte lexicographical order   
when the `IntegerSerializer` is used. Also not the warning that is output, 
especially this part `... or serdes that don't preserve ordering when 
lexicographically comparing the serialized bytes ...`

   I think we should clearly state this limitation in the javadocs of the 
`prefixScan()` as we have done for `range()`, maybe with an example. 
   
   Currently, to get `prefixScan()` working for all types, we would need to do 
a complete scan (i.e. `all()`) followed by a filter, right? 
   
   Double checking: Is my understanding correct? @ableegoldman 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2021-02-16 Thread Julian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285172#comment-17285172
 ] 

Julian commented on KAFKA-3702:
---

Hi, I came across this issue while searching in relation to the other two jiras 
I have been commenting for which this same error is also cropping up. I think 
these may all be related... Regardless, it all points to kafka 0.10 versions.. 
this issue has been in our clusters for near 2 years now...

https://issues.apache.org/jira/browse/KAFKA-5649
https://issues.apache.org/jira/browse/SPARK-21453

 

> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12328) Find out partition of a store iterator

2021-02-16 Thread fml2 (Jira)
fml2 created KAFKA-12328:


 Summary: Find out partition of a store iterator
 Key: KAFKA-12328
 URL: https://issues.apache.org/jira/browse/KAFKA-12328
 Project: Kafka
  Issue Type: Wish
Reporter: fml2


This question was posted [on 
stakoverflow|https://stackoverflow.com/questions/66032099/kafka-streams-how-to-get-the-partition-an-iterartor-is-iterating-over]
 and got an answer but the solution is quite complicated hence this ticket.
 
In my Kafka Streams application, I have a task that sets up a scheduled (by the 
wall time) punctuator. The punctuator iterates over the entries of a store and 
does something with them. Like this:
{code:java}
var store = context().getStateStore("MyStore");
var iter = store.all();

while (iter.hasNext()) {
   var entry = iter.next();
   // ... do something with the entry
}

// Print a summary (now): N entries processed
// Print a summary (wish): N entries processed in partition P
{code}
Is it possible to find out which partition the punctuator operates on? The java 
docs for {{ProcessorContext.partition()}} states that this method returns 
{{-1}} within punctuators.

I've read [Kafka Streams: Punctuate vs 
Process|https://stackoverflow.com/questions/50776987/kafka-streams-punctuate-vs-process]
 and the answers there. I can understand that a task is, in general, not tied 
to a particular partition. But an iterator should be tied IMO.

How can I find out the partition?

Or is my assumption that a particular instance of a store iterator is tied to a 
partion wrong?

What I need it for: I'd like to include the partition number in some log 
messages. For now, I have several nearly identical log messages stating that 
the punctuator does this and that. In order to make those messages "unique" I'd 
like to include the partition number into them.
Since I'm working with a single store here (which might be partitioned), I 
assume that every single execution of the punctuator is bound to a single 
partition of that store.
 
It would be cool if there were a method {{iterator.partition}} (or similar) to 
get this information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5649) Producer is being closed generating ssl exception

2021-02-16 Thread Julian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285170#comment-17285170
 ] 

Julian commented on KAFKA-5649:
---

We have now moved to HDP 3.1.5 having spark 2.3.2 and Kafka 2+ but we are 
forced to use kafka 0.10 libraries as no structured streaming build for these 
combined versions. On this new cluster, the issue is still happening in spark 
streaming from kafka. See https://issues.apache.org/jira/browse/SPARK-21453 
which I also updated and linked back to this case. I also see 
https://issues.apache.org/jira/browse/KAFKA-3702 maybe related here.

Maybe kafka 2 is solving this, but unfortunately we have a long way to go until 
we get to spark 2.4,  kafka 2+ and the relevant structured streaming builds 
supporting these two.

> Producer is being closed generating ssl exception
> -
>
> Key: KAFKA-5649
> URL: https://issues.apache.org/jira/browse/KAFKA-5649
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.1
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am 
> getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that 
> step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> 

[GitHub] [kafka] runom opened a new pull request #10130: MINOR: Fix typo in MirrorMaker

2021-02-16 Thread GitBox


runom opened a new pull request #10130:
URL: https://github.com/apache/kafka/pull/10130


   This PR fixes a typo.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #10129: KAFKA-10817; Add clusterId validation to Fetch handling

2021-02-16 Thread GitBox


dajac opened a new pull request #10129:
URL: https://github.com/apache/kafka/pull/10129


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org