[GitHub] [kafka] showuon commented on pull request #12908: MINOR: Prevent NPE in SmokeTestDriver (fix flaky test)

2022-12-02 Thread GitBox


showuon commented on PR #12908:
URL: https://github.com/apache/kafka/pull/12908#issuecomment-1336108293

   Failed with some `MemoryRecordsBuilderTest` tests. Re-run jenkins: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12908/4/
   
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[4] magic=0, 
bufferOffset=0, compressionType=gzip
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[5] magic=1, 
bufferOffset=0, compressionType=gzip
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[10] magic=0, 
bufferOffset=0, compressionType=lz4
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[15] magic=1, 
bufferOffset=15, compressionType=none
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[19] magic=2, 
bufferOffset=15, compressionType=gzip
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[20] magic=0, 
bufferOffset=15, compressionType=snappy
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.common.record.MemoryRecordsBuilderTest.[22] magic=2, 
bufferOffset=15, compressionType=snappy
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12940: MINOR: Remove lock contention while adding sensors

2022-12-02 Thread GitBox


showuon commented on PR #12940:
URL: https://github.com/apache/kafka/pull/12940#issuecomment-1336107162

   Re-run the jenkins: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12940/4/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #12946: KAFKA-14427 ZK client support for migrations

2022-12-02 Thread GitBox


mumrah opened a new pull request, #12946:
URL: https://github.com/apache/kafka/pull/12946

   TODO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #12928: KAFKA-14304 Add RPC changes, records, and config from KIP-866

2022-12-02 Thread GitBox


mumrah merged PR #12928:
URL: https://github.com/apache/kafka/pull/12928


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #12945: KAFKA-14365: Refactor Fetcher to allow different implementations

2022-12-02 Thread GitBox


kirktrue opened a new pull request, #12945:
URL: https://github.com/apache/kafka/pull/12945

   This change refactors the existing `Fetcher` by splitting out the parts 
related to offset listing into a dedicated class named `OffsetsFinder`. This 
allows the `Fetcher` implementation to be more dedicated and thus easier to 
refactor (in coming commits).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14439) Specify returned errors for various APIs and versions

2022-12-02 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642726#comment-17642726
 ] 

Jason Gustafson edited comment on KAFKA-14439 at 12/2/22 11:52 PM:
---

Yeah, we've had so many compatibility breaks due to error code usage. Putting 
the errors into the spec would also enable better enforcement. One option could 
be something like this:
{code:java}
{ 
  "name": "ErrorCode", "type": "enum16", "versions": "0+",
  "about": "The error code,",
  "values": [
{"value": 0, "versions": "0+", "about": "The operation completed 
successfully"},
{"value": 1, "versions": "1+", "about": "The requested offset was out of 
range"},
    {"value": 4, "versions": "1+", "about": "Invalid fetch size"},
  ]
} {code}
Here "enum16" indicates a 2-byte enumeration where the values are provided in 
the `values` field. New values can only be added with a version bump.


was (Author: hachikuji):
Yeah, we've had so many compatibility breaks due to error code usage. Putting 
the errors into the spec would also enable better enforcement. One option could 
be something like this:
{code:java}
{ 
  "name": "ErrorCode", "type": "enum16", "versions": "0+",
  "about": "The error code,",
  "values": [
{"value": 0, "versions": "0+", "about": "The operation completed 
successfully"},
{"value": 1, "versions": "1+", "about": "The requested offset was out of 
range"},
  ]
} {code}
Here "enum16" indicates a 2-byte enumeration where the values are provided in 
the `values` field. New values can only be added with a version bump.

> Specify returned errors for various APIs and versions
> -
>
> Key: KAFKA-14439
> URL: https://issues.apache.org/jira/browse/KAFKA-14439
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Major
>
> Kafka is known for supporting various clients and being compatible across 
> different versions. But one thing that is a bit unclear is what errors each 
> response can send. 
> Knowing what errors can come from each version helps those who implement 
> clients have a more defined spec for what errors they need to handle. When 
> new errors are added, it is clearer to the clients that changes need to be 
> made.
> It also helps contributors get a better understanding about how clients are 
> expected to react and potentially find and prevent gaps like the one found in 
> https://issues.apache.org/jira/browse/KAFKA-14417
> I briefly synced offline with [~hachikuji] about this and he suggested maybe 
> adding values for the error codes in the schema definitions of APIs that 
> specify the error codes and what versions they are returned on. One idea was 
> creating some enum type to accomplish this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14439) Specify returned errors for various APIs and versions

2022-12-02 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642735#comment-17642735
 ] 

Ismael Juma commented on KAFKA-14439:
-

I think it would also be useful for the server to indicate whether an error is 
retriable or not. That would make protocol evolution a lot more flexible.

> Specify returned errors for various APIs and versions
> -
>
> Key: KAFKA-14439
> URL: https://issues.apache.org/jira/browse/KAFKA-14439
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Major
>
> Kafka is known for supporting various clients and being compatible across 
> different versions. But one thing that is a bit unclear is what errors each 
> response can send. 
> Knowing what errors can come from each version helps those who implement 
> clients have a more defined spec for what errors they need to handle. When 
> new errors are added, it is clearer to the clients that changes need to be 
> made.
> It also helps contributors get a better understanding about how clients are 
> expected to react and potentially find and prevent gaps like the one found in 
> https://issues.apache.org/jira/browse/KAFKA-14417
> I briefly synced offline with [~hachikuji] about this and he suggested maybe 
> adding values for the error codes in the schema definitions of APIs that 
> specify the error codes and what versions they are returned on. One idea was 
> creating some enum type to accomplish this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14438) Stop supporting empty consumer groupId

2022-12-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14438:

Priority: Blocker  (was: Major)

> Stop supporting empty consumer groupId
> --
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
>  
> cc [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14439) Specify returned errors for various APIs and versions

2022-12-02 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642726#comment-17642726
 ] 

Jason Gustafson edited comment on KAFKA-14439 at 12/2/22 11:44 PM:
---

Yeah, we've had so many compatibility breaks due to error code usage. Putting 
the errors into the spec would also enable better enforcement. One option could 
be something like this:
{code:java}
{ 
  "name": "ErrorCode", "type": "enum16", "versions": "0+",
  "about": "The error code,",
  "values": [
{"value": 0, "versions": "0+", "about": "The operation completed 
successfully"},
{"value": 1, "versions": "1+", "about": "The requested offset was out of 
range"},
  ]
} {code}
Here "enum16" indicates a 2-byte enumeration where the values are provided in 
the `values` field. New values can only be added with a version bump.


was (Author: hachikuji):
Yeah, we've had so many compatibility breaks due to error code usage. Putting 
the errors into the spec would also enable better enforcement. One option could 
be something like this:
{code:java}
{ 
  "name": "ErrorCode", "type": "enum16", "versions": "0+",
  "about": "The error code,",
  "values": [
{"value": 0, "versions": "0+", "about": "The operation completed 
successfully"},
{"value": 1, "versions": "1+", about": "The requested offset was out of 
range"},
  ]
} {code}
Here "enum16" indicates a 2-byte enumeration where the values are provided in 
the `values` field. New values can only be added with a version bump.

> Specify returned errors for various APIs and versions
> -
>
> Key: KAFKA-14439
> URL: https://issues.apache.org/jira/browse/KAFKA-14439
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Major
>
> Kafka is known for supporting various clients and being compatible across 
> different versions. But one thing that is a bit unclear is what errors each 
> response can send. 
> Knowing what errors can come from each version helps those who implement 
> clients have a more defined spec for what errors they need to handle. When 
> new errors are added, it is clearer to the clients that changes need to be 
> made.
> It also helps contributors get a better understanding about how clients are 
> expected to react and potentially find and prevent gaps like the one found in 
> https://issues.apache.org/jira/browse/KAFKA-14417
> I briefly synced offline with [~hachikuji] about this and he suggested maybe 
> adding values for the error codes in the schema definitions of APIs that 
> specify the error codes and what versions they are returned on. One idea was 
> creating some enum type to accomplish this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14439) Specify returned errors for various APIs and versions

2022-12-02 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642726#comment-17642726
 ] 

Jason Gustafson commented on KAFKA-14439:
-

Yeah, we've had so many compatibility breaks due to error code usage. Putting 
the errors into the spec would also enable better enforcement. One option could 
be something like this:
{code:java}
{ 
  "name": "ErrorCode", "type": "enum16", "versions": "0+",
  "about": "The error code,",
  "values": [
{"value": 0, "versions": "0+", "about": "The operation completed 
successfully"},
{"value": 1, "versions": "1+", about": "The requested offset was out of 
range"},
  ]
} {code}
Here "enum16" indicates a 2-byte enumeration where the values are provided in 
the `values` field. New values can only be added with a version bump.

> Specify returned errors for various APIs and versions
> -
>
> Key: KAFKA-14439
> URL: https://issues.apache.org/jira/browse/KAFKA-14439
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Major
>
> Kafka is known for supporting various clients and being compatible across 
> different versions. But one thing that is a bit unclear is what errors each 
> response can send. 
> Knowing what errors can come from each version helps those who implement 
> clients have a more defined spec for what errors they need to handle. When 
> new errors are added, it is clearer to the clients that changes need to be 
> made.
> It also helps contributors get a better understanding about how clients are 
> expected to react and potentially find and prevent gaps like the one found in 
> https://issues.apache.org/jira/browse/KAFKA-14417
> I briefly synced offline with [~hachikuji] about this and he suggested maybe 
> adding values for the error codes in the schema definitions of APIs that 
> specify the error codes and what versions they are returned on. One idea was 
> creating some enum type to accomplish this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14439) Specify returned errors for various APIs and versions

2022-12-02 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14439:
--

 Summary: Specify returned errors for various APIs and versions
 Key: KAFKA-14439
 URL: https://issues.apache.org/jira/browse/KAFKA-14439
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan


Kafka is known for supporting various clients and being compatible across 
different versions. But one thing that is a bit unclear is what errors each 
response can send. 

Knowing what errors can come from each version helps those who implement 
clients have a more defined spec for what errors they need to handle. When new 
errors are added, it is clearer to the clients that changes need to be made.

It also helps contributors get a better understanding about how clients are 
expected to react and potentially find and prevent gaps like the one found in 
https://issues.apache.org/jira/browse/KAFKA-14417

I briefly synced offline with [~hachikuji] about this and he suggested maybe 
adding values for the error codes in the schema definitions of APIs that 
specify the error codes and what versions they are returned on. One idea was 
creating some enum type to accomplish this. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant reassigned KAFKA-14437:


Assignee: Andrew Grant

> Enhance StripedReplicaPlacer to account for existing partition assignments
> --
>
> Key: KAFKA-14437
> URL: https://issues.apache.org/jira/browse/KAFKA-14437
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> Currently, in StripedReplicaPlacer we don’t take existing partition 
> assignments into consideration when the place method is called. This means 
> for new partitions added, they may get the same assignments as existing 
> partitions. This differs from AdminUtils, which has some logic to try and 
> shift where in the list of brokers we start making assignments from for new 
> partitions added.
> For example, lets say we had the following
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
> {code}
> CreateTopics might return the following assignment for two partitions:
> {code:java}
> P0: 6, 8, 2
> P1: 9, 3, 7
> {code}
> If the user then calls CreatePartitions increasing the partition count to 4, 
> StripedReplicaPlacer does not take into account P0 and P1. It creates a 
> random rack offset and a random broker offset. So it could easily create the 
> same assignment for P3 and P4 that it created for P0 and P1. This is easily 
> reproduced in a unit test.
> My suggestion is to enhance StripedReplicaPlacer to account for existing 
> partition assignments. Intuitively, we’d like to make assignments for added 
> partitions from “where we left off” when we were making the previous 
> assignments. In practice, its not possible to know exactly what the state was 
> during the previous partition assignments because, for example, brokers 
> fencing state may have changed. But I do think we can make a best effort 
> attempt to do so that is optimized for the common case where most brokers are 
> unfenced. Note, all the changes suggested below only will affect 
> StripedReplicaPlacer when place is called and there are existing partition 
> assignments, which happens when its servicing CreatePartitions requests. If 
> there are no existing partition assignments, which happens during 
> CreateTopics, the logic is unchanged.
> First, we need to update ClusterDescriber to:
> {code:java}
> public interface ClusterDescriber {
>     /**
>      * Get an iterator through the usable brokers.
>      */
>     Iterator usableBrokers();
>     List> replicasForTopicName(String topicName);
> }
> {code}
> The replicasForTopicName returns the existing partition assignments. This 
> will enable StripedReplicaPlacer to know about existing partition assignments 
> when they exist.
> When place is called, some initialization is done in both RackList and 
> BrokerList. One thing that is initialized is the offset variable - this is a 
> variable used in both RackList and BrokerList that determines where in the 
> list of either racks or brokers respectively we should start from when making 
> the next assignment. Currently, it is initialized to a random value, based 
> off the size of the list. 
> I suggest we add some logic during initialization that sets the offset for 
> both RackList and BrokerList to a value based off the previous assignments.
> Consider again the following rack metadata and existing assignments:
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
>  
> P0: 6, 8, 2
> P1: 9, 3, 7  
> {code}
> Lets imagine a user wants to create a new partition, P3. 
> First, we need to determine which rack to start from for P3: this corresponds 
> to the initial offset in RackList. We can look at the leader of P1 (not P0 
> because P1 is the “last” partition we made an assignment for) and see its on 
> rack 3. So, the next rack we should start from should be rack 1. This means 
> we set offset in RackList to 0, instead of a random value, during 
> initialization. 
> Second, we need to determine which broker to start from {_}per rack{_}: this 
> corresponds to the initial offset in BrokerList. We can look at all the 
> existing partition assignments, P0 and P1 in our example, and _per rack_ 
> infer the last offset started from during previous assignments. For each 
> rack, we do this by iterating through each partition, in reverse order 
> because we care about the most recent starting position, and try to find the 
> first broker in the assignment from the rack. This enables us to know where 
> we last started from when making an assignment for that rack, which can be 
> used to determine where to continue on from within that rack.
> So in our example, for rack 1 we can see the last broker we started from was 
> broker 3 in 

[jira] [Created] (KAFKA-14438) Stop supporting empty consumer groupId

2022-12-02 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14438:
--

 Summary: Stop supporting empty consumer groupId
 Key: KAFKA-14438
 URL: https://issues.apache.org/jira/browse/KAFKA-14438
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
 Fix For: 4.0.0


Currently, a warning message is logged upon using an empty consumer groupId. In 
the next major release, we should drop the support of empty ("") consumer 
groupId.

 

cc [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky opened a new pull request, #12944: [KAFKA-14395] add config to configure client supplier

2022-12-02 Thread GitBox


lihaosky opened a new pull request, #12944:
URL: https://github.com/apache/kafka/pull/12944

   ## Description
   This PR is for 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams.
 It adds a new `default.client.supplier` config to configure the 
`KafkaClientSupplier`.
   
   ## Test
   Unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12897: KAFKA-14379: consumer should refresh preferred read replica on update metadata

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12897:
URL: https://github.com/apache/kafka/pull/12897#discussion_r1038530374


##
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##
@@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final 
String clusterId,
 for (int i = 0; i < numPartitions; i++) {
 TopicPartition tp = new TopicPartition(topic, i);
 Node leader = nodes.get(i % nodes.size());
-List replicaIds = 
Collections.singletonList(leader.id());
+List replicaIds = 
nodes.stream().map(Node::id).collect(Collectors.toList());

Review Comment:
   Leaving the replication factor as implicit seems less than ideal. Perhaps we 
could make it an explicit argument? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2022-12-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6643.

Resolution: Won't Fix

> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition which we need to warm up 
> on the fly(when node containing that partition crashes. Produce into this 
> topic from another replica/active and built new replica through this topic.
> 2. Use peer to peer file transfer(such as SFTP) as rocksdb can create 
> backups, which can be transferred from source node to destination node when a 
> new replica has to be built from scratch.
> 3. Use HDFS in intermediate instead of kafka topic where we can keep 
> scheduled backups for each partition and use those to build new replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14379) consumer should refresh preferred read replica on update metadata

2022-12-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14379:

Priority: Critical  (was: Major)

> consumer should refresh preferred read replica on update metadata
> -
>
> Key: KAFKA-14379
> URL: https://issues.apache.org/jira/browse/KAFKA-14379
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Critical
> Fix For: 3.4.0
>
>
> The consumer (fetcher) refreshes the preferred read replica only on three 
> conditions:
>  # the consumer receives an OFFSET_OUT_OF_RANGE error
>  # the follower does not exist in the client's metadata (i.e., offline)
>  # after metadata.max.age.ms (5 min default)
> For other errors, it will continue to reach to the possibly unavailable 
> follower and only after 5 minutes will it refresh the preferred read replica 
> and go back to the leader.
> A specific example is when a partition is reassigned. the consumer will get 
> NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred 
> read replica will not be refreshed as the follower is still online. it will 
> continue to reach out to the old follower until the preferred read replica 
> expires.
> the consumer can instead refresh its preferred read replica whenever it makes 
> a metadata update request. so when the consumer receives i.e. 
> NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without 
> waiting for the expiration.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14379) consumer should refresh preferred read replica on update metadata

2022-12-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14379:

Fix Version/s: 3.4.0

> consumer should refresh preferred read replica on update metadata
> -
>
> Key: KAFKA-14379
> URL: https://issues.apache.org/jira/browse/KAFKA-14379
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.4.0
>
>
> The consumer (fetcher) refreshes the preferred read replica only on three 
> conditions:
>  # the consumer receives an OFFSET_OUT_OF_RANGE error
>  # the follower does not exist in the client's metadata (i.e., offline)
>  # after metadata.max.age.ms (5 min default)
> For other errors, it will continue to reach to the possibly unavailable 
> follower and only after 5 minutes will it refresh the preferred read replica 
> and go back to the leader.
> A specific example is when a partition is reassigned. the consumer will get 
> NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred 
> read replica will not be refreshed as the follower is still online. it will 
> continue to reach out to the old follower until the preferred read replica 
> expires.
> the consumer can instead refresh its preferred read replica whenever it makes 
> a metadata update request. so when the consumer receives i.e. 
> NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without 
> waiting for the expiration.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-6542) Tables should trigger joins too, not just streams

2022-12-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6542.

Resolution: Invalid

This will be fixed via versioned stores and "delayed table lookups".

> Tables should trigger joins too, not just streams
> -
>
> Key: KAFKA-6542
> URL: https://issues.apache.org/jira/browse/KAFKA-6542
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>
> At the moment it's quite possible to have a race condition when joining a 
> stream with a table, if the stream event arrives first, before the table 
> event, in which case the join will fail.
> This is also related to bootstrapping KTables (which is what a GKTable does).
> Related to: KAFKA-4113 Allow KTable bootstrap



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-6509) Add additional tests for validating store restoration completes before Topology is intitalized

2022-12-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6509.

Resolution: Not A Problem

> Add additional tests for validating store restoration completes before 
> Topology is intitalized
> --
>
> Key: KAFKA-6509
> URL: https://issues.apache.org/jira/browse/KAFKA-6509
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> Right now 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-5245) KStream builder should capture serdes

2022-12-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5245.

Resolution: Fixed

> KStream builder should capture serdes 
> --
>
> Key: KAFKA-5245
> URL: https://issues.apache.org/jira/browse/KAFKA-5245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: needs-kip
>
> Even if one specifies a serdes in `builder.stream`, later a call to 
> `groupByKey` may require the serdes again if it differs from the configured 
> streams app serdes. The preferred behavior is that if no serdes is provided 
> to `groupByKey`, it should use whatever was provided in `builder.stream` and 
> not what was in the app.
> From the current docs:
> “When to set explicit serdes: Variants of groupByKey exist to override the 
> configured default serdes of your application, which you must do if the key 
> and/or value types of the resulting KGroupedStream do not match the 
> configured default serdes.”



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-5085) Add test for rebalance exceptions

2022-12-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5085.

Resolution: Abandoned

Might no be necessary any longer given all the changes we did over the years.

> Add test for rebalance exceptions
> -
>
> Key: KAFKA-5085
> URL: https://issues.apache.org/jira/browse/KAFKA-5085
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> We currently lack a proper test for the case that an exceptions in throw 
> during rebalance within Streams rebalance listener.
> We recently had a bug, for which the app hang on an exception because the 
> exception was not handled properly (KAFKA-5073). Writing a test might require 
> some code refactoring to make testing simpler in the first place.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-5493) Optimize calls to flush for tasks and standby tasks

2022-12-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5493:
---
Labels: eos  (was: )

> Optimize calls to flush for tasks and standby tasks
> ---
>
> Key: KAFKA-5493
> URL: https://issues.apache.org/jira/browse/KAFKA-5493
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: eos
>
> With EOS enabled we don't checkpoint on {{commit}} so there is no need to 
> call {{flush}} when committing _top level tasks_ .  However for _standby 
> tasks_ we still checkpoint thus need to still flush when committing. We need 
> to develop an approach where we can optimize for top level tasks by avoid 
> flushing on commit, while still preserving flush on commit for standby tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038429334


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;

Review Comment:
   GroupId must not be null for `FindCoordinator` request. If groupId is null, 
we don't need `CoordinatorRequestManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427640


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427945


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038427640


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423520


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423520


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) 

[GitHub] [kafka] belugabehr commented on pull request #8066: KAFKA-4090: Validate SSL connection in client

2022-12-02 Thread GitBox


belugabehr commented on PR #8066:
URL: https://github.com/apache/kafka/pull/8066#issuecomment-1335664538

   I documented my work on the ticket as well.  It got very little attention 
from the Kafka Dev Team and seems to be pretty out of date now (merge 
conflicts).  Will need someone internal to the project to take this baton and 
run with it.
   
   https://issues.apache.org/jira/browse/KAFKA-4090


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038422365


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());

Review Comment:
   After we call `poll()`, we need to iterate through unsent requests and see 
if the node has been disconnected. Similar to 
`ConsumerNetworkClient.checkDisconnects`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038423837


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));
+}
+}
+}
+
+static boolean isReady(KafkaClient client, Node node, long currentTime) {
+client.poll(0, currentTime);
+return client.isReady(node, currentTime);
+}
+
+public boolean doSend(UnsentRequest r) {
+long now = time.milliseconds();
+Node node = r.node.orElse(client.leastLoadedNode(now));
+if (node == null) {
+return false;
+}
+ClientRequest request = makeClientRequest(r, node);
+// TODO: Sounds like we need to check disconnections for each node and 
complete the request with
+//  authentication error
+if (isReady(client, node, now)) {
+client.send(request, now);
+}
+return true;
+}
+
+private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+return client.newClientRequest(
+node.idString(),
+unsent.abstractBuilder,
+time.milliseconds(),
+true,
+// TODO: Determine if we want the actual request timeout here 
to be requestTimeoutMs - timeInUnsentQueue
+(int) 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));

Review Comment:
   Use retriable exception. Maybe `NETWORK_EXCEPTION`? Or a custom exception, 
`NoNodeAvailableException` or sth like that.
   
   An alternative is to add to the end of the queue? We don't need to back off. 
Or perhaps we can use a separate queue for requests that have no target node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));

Review Comment:
   Use retriable exception. Maybe `NETWORK_EXCEPTION`?
   
   An alternative is to add to the end of the queue? We don't need to back off. 
Or perhaps we can use a separate queue for requests that have no target node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038418696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to 
handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+private final KafkaClient client;
+private final Time time;
+private final Logger log;
+private boolean wakeup = false;
+private final Queue unsentRequests;
+
+public NetworkClientDelegate(
+final Time time,
+final LogContext logContext,
+final KafkaClient client) {
+this.time = time;
+this.client = client;
+this.log = logContext.logger(getClass());
+this.unsentRequests = new ArrayDeque<>();
+}
+
+public List poll(Timer timer, boolean disableWakeup) {
+client.wakeup();
+if (!disableWakeup) {
+// trigger wakeups after checking for disconnects so that the 
callbacks will be ready
+// to be fired on the next call to poll()
+maybeTriggerWakeup();
+}
+
+trySend();
+return this.client.poll(timer.timeoutMs(), time.milliseconds());
+}
+
+private void trySend() {
+while (unsentRequests.size() > 0) {
+UnsentRequest unsent = unsentRequests.poll();
+if (unsent.timer.isExpired()) {
+// TODO: expired request should be marked
+unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
+"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+continue;
+}
+
+if (!doSend(unsent)) {
+log.debug("No broker available to send the request: {}", 
unsent);
+unsent.callback.ifPresent(v -> v.onFailure(
+new IllegalThreadStateException("No node available in 
the kafka cluster to send the request")));

Review Comment:
   Use retriable exception. Maybe `NETWORK_EXCEPTION`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14436) Initialize KRaft with arbitrary epoch

2022-12-02 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang reassigned KAFKA-14436:


Assignee: Alyssa Huang

> Initialize KRaft with arbitrary epoch
> -
>
> Key: KAFKA-14436
> URL: https://issues.apache.org/jira/browse/KAFKA-14436
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: Alyssa Huang
>Priority: Major
>
> For the ZK migration, we need to be able to initialize Raft with an 
> arbitrarily high epoch (within the size limit). This is because during the 
> migration, we want to write the Raft epoch as the controller epoch in ZK. We 
> require that epochs in /controller_epoch are monotonic in order for brokers 
> to behave normally. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038385341


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+// Ensure InitPid retries.
+prepareInitPidResponse(error, false, producerId, epoch);
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+transactionManager.beginTransaction();
+
+// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+transactionManager.maybeAddPartition(tp0);
+prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+// Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+// Ensure EndTxn retries.
+TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+runUntil(abortResult::isCompleted);
+assertTrue(abortResult.isSuccessful());
+}
+
+@Test
+public void testCoordinatorNotAvailable() {

Review Comment:
   Oops, I added it to that test. I will revert that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1038378151


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorRequestManager implements RequestManager {
+
+private final Logger log;
+private final Time time;
+private final long requestTimeoutMs;
+private final ErrorEventHandler errorHandler;
+private final long rebalanceTimeoutMs;
+private final Optional groupId;
+
+private final CoordinatorRequestState coordinatorRequestState;
+private long timeMarkedUnknownMs = -1L; // starting logging a warning only 
after unable to connect for a while
+private Node coordinator;
+
+
+public CoordinatorRequestManager(final Time time,
+ final LogContext logContext,
+ final ConsumerConfig config,
+ final ErrorEventHandler errorHandler,
+ final Optional groupId,
+ final long rebalanceTimeoutMs) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+this.coordinatorRequestState = new CoordinatorRequestState(config);
+}
+
+CoordinatorRequestManager(final Time time,
+  final LogContext logContext,
+  final ErrorEventHandler errorHandler,
+  final Optional groupId,
+  final long rebalanceTimeoutMs,
+  final long requestTimeoutMs,
+  final CoordinatorRequestState 
coordinatorRequestState) {
+this.time = time;
+this.log = logContext.logger(this.getClass());
+this.errorHandler = errorHandler;
+this.groupId = groupId;
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+this.requestTimeoutMs = requestTimeoutMs;
+this.coordinatorRequestState = coordinatorRequestState;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeFindCoordinatorRequest(currentTimeMs);
+return new NetworkClientDelegate.PollResult(-1, 
Collections.singletonList(request));
+}
+
+return new NetworkClientDelegate.PollResult(
+coordinatorRequestState.remainingBackoffMs(currentTimeMs),
+new ArrayList<>());
+}
+
+private NetworkClientDelegate.UnsentRequest 
makeFindCoordinatorRequest(final long currentTimeMs) {
+coordinatorRequestState.updateLastSend(currentTimeMs);
+FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+

[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038368961


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+// Ensure InitPid retries.
+prepareInitPidResponse(error, false, producerId, epoch);
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+transactionManager.beginTransaction();
+
+// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+transactionManager.maybeAddPartition(tp0);
+prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+// Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+// Ensure EndTxn retries.
+TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+runUntil(abortResult::isCompleted);
+assertTrue(abortResult.isSuccessful());
+}
+
+@Test
+public void testCoordinatorNotAvailable() {

Review Comment:
   I suppose we have `testLookupCoordinatorOnNotCoordinatorError` Is this what 
you were thinking of?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038367059


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+// Ensure InitPid retries.
+prepareInitPidResponse(error, false, producerId, epoch);
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+transactionManager.beginTransaction();
+
+// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+transactionManager.maybeAddPartition(tp0);
+prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+// Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+// Ensure EndTxn retries.
+TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+runUntil(abortResult::isCompleted);
+assertTrue(abortResult.isSuccessful());
+}
+
+@Test
+public void testCoordinatorNotAvailable() {

Review Comment:
   Sorry, I realized this is not a valid error for `FindCoordinator`, so 
nevermind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038365461


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I thought about this, but I didn't know how to make the errors from strings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
 "UNKNOWN_TOPIC_OR_PARTITION", 
 "REQUEST_TIMED_OUT", 
 "COORDINATOR_LOAD_IN_PROGRESS", 
 "CONCURRENT_TRANSACTIONS" 
   })
   public void testRetriableErrors(Errors error) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
 "UNKNOWN_TOPIC_OR_PARTITION", 
 "REQUEST_TIMED_OUT", 
 "COORDINATOR_LOAD_IN_PROGRESS", 
 "CONCURRENT_TRANSACTIONS" 
   })
   ```



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+// Ensure InitPid retries.
+prepareInitPidResponse(error, false, producerId, epoch);
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+transactionManager.beginTransaction();
+
+// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+transactionManager.maybeAddPartition(tp0);
+prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+// Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+// Ensure EndTxn retries.
+TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+runUntil(abortResult::isCompleted);
+assertTrue(abortResult.isSuccessful());
+}
+
+@Test
+public void testCoordinatorNotAvailable() {
+// Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
false, CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+}
+

Review Comment:
   nit: extra new line



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+

[GitHub] [kafka] jonathan-albrecht-ibm commented on pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK

2022-12-02 Thread GitBox


jonathan-albrecht-ibm commented on PR #12343:
URL: https://github.com/apache/kafka/pull/12343#issuecomment-1335553986

   Thanks @mimaison!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
-
Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();

    List> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment from the rack. This enables us to know where we last started 
from when making an assignment for that rack, which can be used to determine 
where to continue on from within that rack.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was was broker 9 in P1: so the next broker should be 10 which means the 
offset is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration 

[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
-
Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();

    List> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment from the rack. This enables us to know where we last started 
from when making an assignment for that rack, which can be used to determine 
where to continue on from within that rack.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was broker 9 in P1: so the next broker should be 10 which means the offset 
is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when 

[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
-
Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();

    List> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment from the rack. This enables us to know where we last started 
from when making an assignment for that rack, which can be used to determine 
where to continue on from.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was was broker 9 in P1: so the next broker should be 10 which means the 
offset is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place 

[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
-
Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();

    List> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment. This enables us to know where we last started from when making 
an assignment for that rack, which can be used to determine where to continue 
on from.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was was broker 9 in P1: so the next broker should be 10 which means the 
offset is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is 

[jira] [Updated] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
-
Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();
    List> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment. This enables us to know where we last started from when making 
an assignment for that rack, which can be used to determine where to continue 
on from.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was was broker 9 in P1: so the next broker should be 10 which means the 
offset is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. 

[jira] [Commented] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642602#comment-17642602
 ] 

Andrew Grant commented on KAFKA-14437:
--

I created a draft PR, [https://github.com/apache/kafka/pull/12943,] that 
illustrates the idea. 

> Enhance StripedReplicaPlacer to account for existing partition assignments
> --
>
> Key: KAFKA-14437
> URL: https://issues.apache.org/jira/browse/KAFKA-14437
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Priority: Major
>
> Currently, in StripedReplicaPlacer we don’t take existing partition 
> assignments into consideration when the place method is called. This means 
> for new partitions added, they may get the same assignments as existing 
> partitions. This differs from AdminUtils, which has some logic to try and 
> shift where in the list of brokers we start making assignments from for new 
> partitions added.
> For example, lets say we had the following
>  
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
> {code}
> CreateTopics might return the following assignment for two partitions:
>  
> {code:java}
> P0: 6, 8, 2
> P1: 9, 3, 7
> {code}
> If the user then calls CreatePartitions increasing the partition count to 4, 
> StripedReplicaPlacer does not take into account P0 and P1. It creates a 
> random rack offset and a random broker offset. So it could easily create the 
> same assignment for P3 and P4 that it created for P0 and P1. This is easily 
> reproduced in a unit test.
>  
> My suggestion is to enhance StripedReplicaPlacer to account for existing 
> partition assignments. Intuitively, we’d like to make assignments for added 
> partitions from “where we left off” when we were making the previous 
> assignments. In practice, its not possible to know exactly what the state was 
> during the previous partition assignments because, for example, brokers 
> fencing state may have changed. But I do think we can make a best effort 
> attempt to do so that is optimized for the common case where most brokers are 
> unfenced. Note, all the changes suggested below only will affect 
> StripedReplicaPlacer when place is called and there are existing partition 
> assignments, which happens when its servicing CreatePartitions requests. If 
> there are no existing partition assignments, which happens during 
> CreateTopics, the logic is unchanged.
>  
> First, we need to update ClusterDescriber to:
>  
>  
> {code:java}
> public interface ClusterDescriber {
>     /**
>      * Get an iterator through the usable brokers.
>      */
>     Iterator usableBrokers();
>     List> replicasForTopicName(String topicName);
> }
> {code}
>  
>  
> The replicasForTopicName returns the existing partition assignments. This 
> will enable StripedReplicaPlacer to know about existing partition assignments 
> when they exist.
> When place is called, some initialization is done in both RackList and 
> BrokerList. One thing that is initialized is the offset variable - this is a 
> variable used in both RackList and BrokerList that determines where in the 
> list of either racks or brokers respectively we should start from when making 
> the next assignment. Currently, it is initialized to a random value, based 
> off the size of the list. 
> I suggest we add some logic during initialization that sets the offset for 
> both RackList and BrokerList to a value based off the previous assignments.
> Consider again the following rack metadata and existing assignments:
>  
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
>  
> P0: 6, 8, 2
> P1: 9, 3, 7  
> {code}
>  
> Lets imagine a user wants to create a new partition, called P3. 
> First, we need to determine which rack to start from for P3: this corresponds 
> to the initial offset in RackList. We can look at the leader of P1 (not P0 
> because P1 is the “last” partition we made an assignment for) and see its on 
> rack 3. So, the next rack we should start from should be rack 1. This means 
> we set offset in RackList to 0, instead of a random value, during 
> initialization. 
> Second, we need to determine which broker to start from {_}per rack{_}: this 
> corresponds to the initial offset in BrokerList. We can look at all the 
> existing partition assignments, P0 and P1 in our example, and _per rack_ 
> infer the last offset started from during previous assignments. For each 
> rack, we do this by iterating through each partition, in reverse order 
> because we care about the most recent starting position, and try to find the 
> first broker in the assignment. This enables us to know where we last started 
> from when making an assignment for that rack, which can be used to determine 
> where to continue on from.
> So in our 

[GitHub] [kafka] andymg3 opened a new pull request, #12943: KAFKA-14437: Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread GitBox


andymg3 opened a new pull request, #12943:
URL: https://github.com/apache/kafka/pull/12943

   ### Details
   This is a draft PR showing a potential solution to 
https://issues.apache.org/jira/browse/KAFKA-14437. The idea is to enhance 
StripedReplicaPlacer to use previous partition assignments to set the initial 
RackList and BrokerList state such that we dont create the same assignments for 
new partitions. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14437) Enhance StripedReplicaPlacer to account for existing partition assignments

2022-12-02 Thread Andrew Grant (Jira)
Andrew Grant created KAFKA-14437:


 Summary: Enhance StripedReplicaPlacer to account for existing 
partition assignments
 Key: KAFKA-14437
 URL: https://issues.apache.org/jira/browse/KAFKA-14437
 Project: Kafka
  Issue Type: Improvement
Reporter: Andrew Grant


Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following

 
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:

 
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

 

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

 

First, we need to update ClusterDescriber to:

 

 
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator usableBrokers();
    List> replicasForTopicName(String topicName);
}
{code}
 

 

The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:

 
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
 

Lets imagine a user wants to create a new partition, called P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment. This enables us to know where we last started from when making 
an assignment for that rack, which can be used to determine where to continue 
on from.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was was broker 9 in P1: so the next broker should be 10 

[jira] [Resolved] (KAFKA-14358) Users should not be able to create a regular topic name __cluster_metadata

2022-12-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-14358.

Resolution: Fixed

> Users should not be able to create a regular topic name __cluster_metadata
> --
>
> Key: KAFKA-14358
> URL: https://issues.apache.org/jira/browse/KAFKA-14358
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Blocker
> Fix For: 3.4.0, 3.3.2
>
>
> The following test passes and it should not:
> {code:java}
>  $ git diff                           
> diff --git 
> a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala 
> b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
> index 57834234cc..14b1435d00 100644
> --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
> @@ -102,6 +102,12 @@ class CreateTopicsRequestTest extends 
> AbstractCreateTopicsRequestTest {
>      validateTopicExists("partial-none")
>    }
>   
> +  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
> +  @ValueSource(strings = Array("zk", "kraft"))
> +  def testClusterMetadataTopicFails(quorum: String): Unit = {
> +    createTopic("__cluster_metadata", 1, 1)
> +  }
> +
>    @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
>    @ValueSource(strings = Array("zk"))
>    def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {{code}
> Result of this test:
> {code:java}
>  $ ./gradlew core:test --tests 
> CreateTopicsRequestTest.testClusterMetadataTopicFails
> > Configure project :
> Starting build with version 3.4.0-SNAPSHOT (commit id bc780c7c) using Gradle 
> 7.5.1, Java 1.8 and Scala 2.13.8
> Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
> > Task :core:test
> Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest 
> > testClusterMetadataTopicFails(String) > 
> kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[1] 
> PASSED
> Gradle Test Run :core:test > Gradle Test Executor 8 > CreateTopicsRequestTest 
> > testClusterMetadataTopicFails(String) > 
> kafka.server.CreateTopicsRequestTest.testClusterMetadataTopicFails(String)[2] 
> PASSED
> BUILD SUCCESSFUL in 44s
> 44 actionable tasks: 3 executed, 41 up-to-date
> {code}
> I think that this test should fail in both KRaft and ZK. We want this to fail 
> in ZK so that it can be migrated to KRaft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface

2022-12-02 Thread GitBox


dajac merged PR #12847:
URL: https://github.com/apache/kafka/pull/12847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on a diff in pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)

2022-12-02 Thread GitBox


zigarn commented on code in PR #12175:
URL: https://github.com/apache/kafka/pull/12175#discussion_r1038293918


##
core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala:
##
@@ -488,6 +488,31 @@ class ConsoleConsumerTest {
 
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
 
+  @Test
+  def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+propsStream.write("print.key=false".getBytes())
+propsStream.close()

Review Comment:
   I'll propose another PR soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)

2022-12-02 Thread GitBox


zigarn commented on PR #12175:
URL: https://github.com/apache/kafka/pull/12175#issuecomment-1335472177

   No problem, thanks for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)

2022-12-02 Thread GitBox


dajac merged PR #12175:
URL: https://github.com/apache/kafka/pull/12175


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy merged pull request #12896: KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers

2022-12-02 Thread GitBox


omkreddy merged PR #12896:
URL: https://github.com/apache/kafka/pull/12896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12175: KAFKA-14146: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)

2022-12-02 Thread GitBox


dajac commented on code in PR #12175:
URL: https://github.com/apache/kafka/pull/12175#discussion_r1038279577


##
core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala:
##
@@ -488,6 +488,31 @@ class ConsoleConsumerTest {
 
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
 
+  @Test
+  def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+propsStream.write("print.key=false".getBytes())
+propsStream.close()

Review Comment:
   I would rather prefer to have something like 
`TestUtils.tempPropertyFile(Map("parse.key" -> "true", "key.separator" -> 
"|")`. However, we can do this in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14436) Initialize KRaft with arbitrary epoch

2022-12-02 Thread David Arthur (Jira)
David Arthur created KAFKA-14436:


 Summary: Initialize KRaft with arbitrary epoch
 Key: KAFKA-14436
 URL: https://issues.apache.org/jira/browse/KAFKA-14436
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Arthur


For the ZK migration, we need to be able to initialize Raft with an arbitrarily 
high epoch (within the size limit). This is because during the migration, we 
want to write the Raft epoch as the controller epoch in ZK. We require that 
epochs in /controller_epoch are monotonic in order for brokers to behave 
normally. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14429) Move OffsetStorageReader from storage package to source package

2022-12-02 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642565#comment-17642565
 ] 

Chris Egerton commented on KAFKA-14429:
---

Although I agree that the package name isn't ideal and there are better 
options, the cost of a migration to a new package name for a class that's been 
part of the public API for several years now might be too high to improve 
things at this point.

> Move OffsetStorageReader from storage package to source package
> ---
>
> Key: KAFKA-14429
> URL: https://issues.apache.org/jira/browse/KAFKA-14429
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: needs-kip
>
> The OffsetStorageReader is an interface provided to source connectors. This 
> does not fit with the broader context of the `storage` package, which is 
> focused on sink/source-agnostic converters and serialization/deserialization.
> The current interface should be deprecated and extend from the relocated 
> interface in a different package.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14429) Move OffsetStorageReader from storage package to source package

2022-12-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14429:
--
Labels: needs-kip  (was: )

> Move OffsetStorageReader from storage package to source package
> ---
>
> Key: KAFKA-14429
> URL: https://issues.apache.org/jira/browse/KAFKA-14429
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: needs-kip
>
> The OffsetStorageReader is an interface provided to source connectors. This 
> does not fit with the broader context of the `storage` package, which is 
> focused on sink/source-agnostic converters and serialization/deserialization.
> The current interface should be deprecated and extend from the relocated 
> interface in a different package.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14427) Add support for ZK migration multi-ops transaction

2022-12-02 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14427:
-
Summary: Add support for ZK migration multi-ops transaction  (was: Add 
support for ZK migration transactions)

> Add support for ZK migration multi-ops transaction
> --
>
> Key: KAFKA-14427
> URL: https://issues.apache.org/jira/browse/KAFKA-14427
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14433) Clear all yammer metrics when test harnesses clean up

2022-12-02 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14433.
--
Resolution: Fixed

> Clear all yammer metrics when test harnesses clean up
> -
>
> Key: KAFKA-14433
> URL: https://issues.apache.org/jira/browse/KAFKA-14433
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Major
> Attachments: image-2022-12-01-13-53-57-886.png, 
> image-2022-12-01-13-55-35-488.png
>
>
> We should clear all yammer metrics from the yammer singleton when the 
> integration test harnesses clean up. This would avoid memory leaks in tests 
> that have a lot of test cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14433) Clear all yammer metrics when test harnesses clean up

2022-12-02 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-14433:


Assignee: David Arthur

> Clear all yammer metrics when test harnesses clean up
> -
>
> Key: KAFKA-14433
> URL: https://issues.apache.org/jira/browse/KAFKA-14433
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Major
> Attachments: image-2022-12-01-13-53-57-886.png, 
> image-2022-12-01-13-55-35-488.png
>
>
> We should clear all yammer metrics from the yammer singleton when the 
> integration test harnesses clean up. This would avoid memory leaks in tests 
> that have a lot of test cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah merged pull request #12942: KAFKA-14433 Clear Yammer metrics in QuorumTestHarness#tearDown

2022-12-02 Thread GitBox


mumrah merged PR #12942:
URL: https://github.com/apache/kafka/pull/12942


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12914: KAFKA-14352: Rack-aware consumer partition assignment (KIP-881)

2022-12-02 Thread GitBox


dajac commented on code in PR #12914:
URL: https://github.com/apache/kafka/pull/12914#discussion_r1037881208


##
clients/src/main/resources/common/message/ConsumerProtocolAssignment.json:
##
@@ -23,6 +23,7 @@
   // that new versions cannot remove or reorder any of the existing fields.
   //
   // Version 2 is to support a new field "GenerationId" in 
ConsumerProtocolSubscription.
+  // Version 3 adds rack id to ConsumerProtocolSubscription.
   "validVersions": "0-2",

Review Comment:
   We need to bump the version here.



##
clients/src/main/resources/common/message/ConsumerProtocolSubscription.json:
##
@@ -24,6 +24,7 @@
 
   // Version 1 added the "OwnedPartitions" field to allow assigner know what 
partitions each member owned
   // Version 2 added a new field "GenerationId" to indicate if the member has 
out-of-date ownedPartitions.
+  // Version 3 adds rack id to enable rack-aware assignment.
   "validVersions": "0-2",

Review Comment:
   We need to bump the version here as well.



##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +88,145 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> new 
TopicAssignmentState(e.getValue(), consumersPerTopic.get(e.getKey();
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
+for (Map.Entry topicEntry : 
topicAssignmentStates.entrySet()) {

Review Comment:
   I have a general question here. One of the key promise of the range assignor 
is that it co-partitions partitions. If you have two topics (foo and bar) with 
three partitions each and two consumers, the first consumer will get foo-0, 
bar-0, foo-1, bar-1 and the second one will get foo-2, bar-2. I am trying to 
understand if we still maintain this property with the rack aware assignment. 
Do we?
   
   I suppose that we do when all the partitions have all the same racks. I am 
not sure about the case where for instance foo would have no rack or only a 
subset of the racks (e.g. brokers are offline).



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -268,6 +304,47 @@ private Map> 
constrainedAssign(Map
 
 Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
 Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
+unassignedPartitions = 
rackInfo.sortPartitionsByRackConsumers(unassignedPartitions);
+
+// Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+// otherwise, to minQuota
+int nextUnfilledConsumerIndex = 0;
+Iterator unassignedIter = 
unassignedPartitions.iterator();
+while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) {
+TopicPartition unassignedPartition = unassignedIter.next();
+String consumer = null;
+int nextIndex = rackInfo.nextRackConsumer(unassignedPartition, 
unfilledMembersWithUnderMinQuotaPartitions, nextUnfilledConsumerIndex);
+if (nextIndex >= 0) {
+consumer = 
unfilledMembersWithUnderMinQuotaPartitions.get(nextIndex);
+int assignmentCount = assignment.get(consumer).size() + 1;
+if (assignmentCount >= minQuota) {
+
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
+if (assignmentCount < maxQuota)
+
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+} else
+nextIndex++;

Review 

[GitHub] [kafka] pprovenzano commented on pull request #12896: KAFKA-14398: Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers

2022-12-02 Thread GitBox


pprovenzano commented on PR #12896:
URL: https://github.com/apache/kafka/pull/12896#issuecomment-1335323936

   I have validated that the failing tests work by manually running them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan updated KAFKA-14435:
--
Affects Version/s: 3.2.3
   3.2.2
   3.2.1
   3.2.0

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.0, 3.2.1, 3.2.2, 3.2.3
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497
 ] 

Purshotam Chauhan edited comment on KAFKA-14435 at 12/2/22 2:12 PM:


This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the `if` block 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].


was (Author: JIRAUSER298490):
This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the `if` clause 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan reassigned KAFKA-14435:
-

Assignee: Purshotam Chauhan

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

2022-12-02 Thread GitBox


lucasbru commented on PR #12935:
URL: https://github.com/apache/kafka/pull/12935#issuecomment-1335257553

   @cadonna Could you have a look? I ran this for a few hours in the soak and 
the native memory is stable. So I think this was the main leak


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12940: MINOR: Remove lock contention while adding sensors

2022-12-02 Thread GitBox


divijvaidya commented on PR #12940:
URL: https://github.com/apache/kafka/pull/12940#issuecomment-1335252064

   @mimaison please take a look when you get a chance! Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12940: MINOR: Remove lock contention while adding sensors

2022-12-02 Thread GitBox


divijvaidya commented on PR #12940:
URL: https://github.com/apache/kafka/pull/12940#issuecomment-1335251627

   Rebased from trunk. Test failures are unrelated.
   ```
   Build / JDK 11 and Scala 2.13 / testFailureToFenceEpoch(String).quorum=kraft 
– kafka.api.TransactionsTest
   
   Build / JDK 11 and Scala 2.13 / [1] true – 
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest
   
   Build / JDK 17 and Scala 2.13 / testSecondaryRefreshAfterElapsedDelay() – 
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwksTest
   
   Build / JDK 17 and Scala 2.13 / [3] Type=Raft-CoReside, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.3-IV3, 
Security=PLAINTEXT – kafka.admin.MetadataQuorumCommandTest
   
   Build / JDK 17 and Scala 2.13 / 
testNoConsumeWithoutDescribeAclViaSubscribe() – 
kafka.api.SaslGssapiSslEndToEndAuthorizationTest
   
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   
   Build / JDK 8 and Scala 2.12 / 
testWakeupAfterSyncGroupReceivedExternalCompletion() – 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest
   
   Build / JDK 8 and Scala 2.12 / 
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once] – 
org.apache.kafka.streams.integration.EosIntegrationTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2022-12-02 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642502#comment-17642502
 ] 

Lucas Brutschy commented on KAFKA-12679:


I referenced this ticket in the PR

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497
 ] 

Purshotam Chauhan edited comment on KAFKA-14435 at 12/2/22 1:21 PM:


This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the `if` clause 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].


was (Author: JIRAUSER298490):
This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the `if` clause [here.|#L523].]

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497
 ] 

Purshotam Chauhan edited comment on KAFKA-14435 at 12/2/22 1:20 PM:


This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the `if` clause [here.|#L523].]


was (Author: JIRAUSER298490):
This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the if clause 
[here|[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].]

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642497#comment-17642497
 ] 

Purshotam Chauhan commented on KAFKA-14435:
---

This can be fixed by adding a flag `noResourceAcls` in `MatchingAclBuilder` 
class. We can set this flag inside the if clause 
[here|[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L523].]

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2022-12-02 Thread Purshotam Chauhan (Jira)
Purshotam Chauhan created KAFKA-14435:
-

 Summary: Kraft: StandardAuthorizer allowing a non-authorized user 
when `allow.everyone.if.no.acl.found` is enabled
 Key: KAFKA-14435
 URL: https://issues.apache.org/jira/browse/KAFKA-14435
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Purshotam Chauhan


When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
everyone only if there is no ACL present for a particular resource. But if 
there are ACL present for the resource, then it shouldn't be allowing everyone.

StandardAuthorizer is allowing the principals for which no ACLs are defined 
even when the resource has other ACLs.

 

This behavior can be validated with the following test case:

 
{code:java}
@Test
public void testAllowEveryoneConfig() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
authorizer.configure(configs);
authorizer.start(new 
AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
authorizer.completeInitialLoad();


// Allow User:Alice to read topic "foobar"
List acls = asList(
withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
WILDCARD, READ, ALLOW))
);
acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));

// User:Bob shouldn't be allowed to read topic "foobar"
assertEquals(singletonList(DENIED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
singletonList(newAction(READ, TOPIC, "foobar";

}
 {code}
 

In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit

2022-12-02 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-14278:
---
Component/s: producer 
 streams

> Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
> ---
>
> Key: KAFKA-14278
> URL: https://issues.apache.org/jira/browse/KAFKA-14278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer , streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit

2022-12-02 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14278:
--

Assignee: Lucas Brutschy

> Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
> ---
>
> Key: KAFKA-14278
> URL: https://issues.apache.org/jira/browse/KAFKA-14278
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14309) Kafka Streams upgrade tests do not cover for FK-joins

2022-12-02 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-14309:
---
Component/s: streams

> Kafka Streams upgrade tests do not cover for FK-joins
> -
>
> Key: KAFKA-14309
> URL: https://issues.apache.org/jira/browse/KAFKA-14309
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> The current streams upgrade system test for FK joins inserts the production 
> of foreign key data and an actual foreign key join in every version of 
> SmokeTestDriver except for the latest. The effect was that FK join upgrades 
> are not tested at all, since no FK join code is executed after the bounce in 
> the system test.
> We should enable the FK-join code in the system test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #12940: MINOR: Remove lock contention while adding sensors

2022-12-02 Thread GitBox


divijvaidya commented on PR #12940:
URL: https://github.com/apache/kafka/pull/12940#issuecomment-1335028608

   The build is failing with:
   ```
   [2022-12-01T20:35:07.343Z] FAILURE: Build completed with 2 failures.
   
   [2022-12-01T20:35:07.343Z] 
   
   [2022-12-01T20:35:07.343Z] 1: Task failed with an exception.
   
   [2022-12-01T20:35:07.343Z] ---
   
   [2022-12-01T20:35:07.343Z] * What went wrong:
   
   [2022-12-01T20:35:07.343Z] Execution failed for task 
':connect:file:compileJava'.
   
   [2022-12-01T20:35:07.343Z] > Compilation failed; see the compiler error 
output for details.
   
   [2022-12-01T20:35:07.343Z] 
   
   [2022-12-01T20:35:07.343Z] * Try:
   
   [2022-12-01T20:35:07.343Z] > Run with --stacktrace option to get the stack 
trace.
   
   [2022-12-01T20:35:07.343Z] > Run with --info or --debug option to get more 
log output.
   
   [2022-12-01T20:35:07.343Z] > Run with --scan to get full insights.
   
   [2022-12-01T20:35:07.343Z] 
==
   
   [2022-12-01T20:35:07.343Z] 
   
   [2022-12-01T20:35:07.343Z] 2: Task failed with an exception.
   
   [2022-12-01T20:35:07.343Z] ---
   
   [2022-12-01T20:35:07.343Z] * What went wrong:
   
   [2022-12-01T20:35:07.343Z] Unable to make progress running work. There are 
items queued for execution but none of them can be started
   
   [2022-12-01T20:35:07.343Z] 
   
   [2022-12-01T20:35:07.343Z] * Try:
   
   [2022-12-01T20:35:07.343Z] > Run with --stacktrace option to get the stack 
trace.
   
   [2022-12-01T20:35:07.343Z] > Run with --info or --debug option to get more 
log output.
   
   [2022-12-01T20:35:07.343Z] > Run with --scan to get full insights.
   
   [2022-12-01T20:35:07.343Z] 
==
   ```
   
   But strangely it passes locally for me:
   ```
   diviv@147dda742970:~/oss/kafka|minor-lock-contention ⇒  ./gradlew 
:connect:file:compileJava
   
   > Configure project :
   Starting build with version 3.4.0-SNAPSHOT (commit id 15aa84a7) using Gradle 
7.6, Java 17 and Scala 2.13.8
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   BUILD SUCCESSFUL in 15s
   6 actionable tasks: 5 executed, 1 up-to-date
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12933: MINOR: Optimize metric recording when quota check not required

2022-12-02 Thread GitBox


divijvaidya commented on PR #12933:
URL: https://github.com/apache/kafka/pull/12933#issuecomment-1335021913

   @mimaison please take a look when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14374) Kafka streams losing messages in State Store during first launch of app

2022-12-02 Thread Youcef Sebiat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642374#comment-17642374
 ] 

Youcef Sebiat commented on KAFKA-14374:
---

Thanks for the response.

1.
{quote}Did you check the content of RocksDB (using interactive queries) to see 
if data is missing there? If it's missing in RocksDB too, flushing to the 
changelog should not be the issue, as the data was never put into the store to 
begin with.
{quote}
We did not check this yet. We will have to have a look at it. 

2.
{quote}I also see in the topology visualisation you added to the ticket, that 
there is two filters before repartitioning – just wondering what they do and if 
they drop records, how can the content of the input topic be the same as the 
content of the result topic?
{quote}
The first filter is to drop tombestones records and the second is to flatMap 
the new key. In this specific case, there are no tombestones (we relaunched our 
CDC to make a snapshot of the table in the DB, so everything is in create mode) 
and the key generator is the identity with casting to int.

3. 
{quote}Last, after data from the repartition topic was processed downstream, we 
would issue a "purge" request and delete records. How do you ensure that the 
repartition topic is really complete?
{quote}
We launched kafka-console-consumer before launching an app and written the 
results to a file. We made sure that we have the exact same number of events as 
the repartition topic. 

4.
{quote}Are there any deletes happening? Note that the repartition topic is 
configured with "log retention", while the changelog topic is configured with 
"log compaction" – thus, the repartition topic could contain two record 
`` and `` (ie k1 exists physically but is actually logically 
deleted), while the changelog topic might have been compacted already and the 
key k1 was purged from it.
{quote}
There are no deletes happening as we paused CDC connector that feeds the input 
topic.

 

What concerns us is that we are executing the same exact topology, on the same 
exact input, and that the state stores are different depending if we are having 
multi threads or single thread mode. What is more concerning is that it is 
specifically the partitions of the group leader in the multi-thread that is 
consistently losing messages.

> Kafka streams losing messages in State Store during first launch of app
> ---
>
> Key: KAFKA-14374
> URL: https://issues.apache.org/jira/browse/KAFKA-14374
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 3.2.0
>Reporter: Youcef Sebiat
>Priority: Major
> Attachments: Screenshot 2022-11-09 at 14.56.00.png
>
>
> We have been using Kafka Streams to implement a CDC based app. Attached is 
> the sub topology of interest.
> `table2` topic is created by Debezium who is connected to a SQL DB. It 
> contains 26K lines. We take `table2` and create a key which is only a 
> conversion of the key from `string` to `int`. This means that we should 
> expect that #table2=#repartition-topic=#state-store; which actually is not 
> verified. What we end up with is the following #table2=#repartition-topic, 
> but  #repartition-topic>#state-store. We actually lose messages and thus 
> corrupt the state store, which makes the app live in incorrect state. (Please 
> note that there is no insertion in `table2` as we paused the connector to 
> verify the cardinality.)
> The above happens only during the first launch, i.e. the app has never been 
> launched before, so internal topics do not exist yet. Restarts of 
> pre-existing apps do not yield any problems.
> We have:
> 1. Broker on Kafka 3.2.
> 2. Client app on 2.8|3.2 (we tried both and we faced the same issue).
> 2. All parameters are by default except `CACHE_MAX_BYTES_BUFFERING_CONFIG` 
> set to `0` and `NUM_STREAM_THREADS_CONFIG` set to `>1`.
>  
> *What actually worked*
> 1. Use a monothread at first launch: using one thread solves the problem. The 
> #table2=#repartition-topic=#state-store is verified.
> 2. Pre-creating kafka internal topics: we noticed that whenever there is 
> rebalance during the first launch of Kafka Streams app, the state stores 
> ended up missing values. This also happens when you launch multiple pods in 
> K8s for example. When we read through the logs, we noticed that there is a 
> rebalance that is triggered when we first launch the app. This comes from the 
> fact that the internal topics get created and assigned, thus the rebalance. 
> So by creating the internal topics before, we avoid the rebalance and we end 
> up by #table2=#repartition-topic=#state-store.
> *What we noticed from the logs*
> On multi-thread mode, we noticed that it is the partitions that are assigned 
> to the thread 

[jira] [Resolved] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value

2022-12-02 Thread Victor van den Hoven (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor van den Hoven resolved KAFKA-14400.
--
Resolution: Not A Bug

It behaves differently because stream-stram-leftjoin semantics have changed.

> KStream - KStream - LeftJoin() does not call ValueJoiner with null value 
> -
>
> Key: KAFKA-14400
> URL: https://issues.apache.org/jira/browse/KAFKA-14400
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1, 3.3.1
> Environment: Windows PC 
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: Afbeelding 2.png, SimpleStreamTopology.java, 
> SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.1.1 :
> When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does not seem to call the _joiner_ with null value when 
> join predicate is not satisfied (not expected).
>  
> When using deprecated +JoinWindows.of(Duration.ofMillis(1));+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does
> call the _joiner_ with null value when join predicate is not satisfied (as 
> expected and documented).
>  
> Attached you can find two files with TopologyTestDriver Unit test to 
> reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value

2022-12-02 Thread Victor van den Hoven (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642366#comment-17642366
 ] 

Victor van den Hoven commented on KAFKA-14400:
--

Thank you for answering my questions.

:)

> KStream - KStream - LeftJoin() does not call ValueJoiner with null value 
> -
>
> Key: KAFKA-14400
> URL: https://issues.apache.org/jira/browse/KAFKA-14400
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1, 3.3.1
> Environment: Windows PC 
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: Afbeelding 2.png, SimpleStreamTopology.java, 
> SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.1.1 :
> When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does not seem to call the _joiner_ with null value when 
> join predicate is not satisfied (not expected).
>  
> When using deprecated +JoinWindows.of(Duration.ofMillis(1));+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does
> call the _joiner_ with null value when join predicate is not satisfied (as 
> expected and documented).
>  
> Attached you can find two files with TopologyTestDriver Unit test to 
> reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14400) KStream - KStream - LeftJoin() does not call ValueJoiner with null value

2022-12-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642353#comment-17642353
 ] 

Matthias J. Sax commented on KAFKA-14400:
-

{quote}Am I correct that when one +LAST+ input record is written to the 
left-input stream not within any window and when no more input records are 
written to any of the input streams, then this LAST input record will never be 
emitted?
{quote}
That correct. In general, this is not an issue though, as in a stream 
processing world, there is no such thing as a "last input record". It can be 
annoying when writing tests, but for this case, you can always add a "dummy" 
record as very last record to flush out all records you actually want to test.
{quote}In the kstream-kstream-join table of the "Improved Left/Outer 
Stream-Stream Join" on the wiki page that was referred to:  Why is "d,ts=14" 
not also joined with "A,ts=3" as the window size is 15?  And why is "D,ts=15" 
not also joined with "A,ts=3"?
{quote}
Seems to be a c error – good catch! I fixed it.

> KStream - KStream - LeftJoin() does not call ValueJoiner with null value 
> -
>
> Key: KAFKA-14400
> URL: https://issues.apache.org/jira/browse/KAFKA-14400
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1, 3.3.1
> Environment: Windows PC 
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: Afbeelding 2.png, SimpleStreamTopology.java, 
> SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.1.1 :
> When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1))+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does not seem to call the _joiner_ with null value when 
> join predicate is not satisfied (not expected).
>  
> When using deprecated +JoinWindows.of(Duration.ofMillis(1));+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does
> call the _joiner_ with null value when join predicate is not satisfied (as 
> expected and documented).
>  
> Attached you can find two files with TopologyTestDriver Unit test to 
> reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe merged pull request #12837: MINOR: extract jointly owned parts of BrokerServer and ControllerServer

2022-12-02 Thread GitBox


cmccabe merged PR #12837:
URL: https://github.com/apache/kafka/pull/12837


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14184) Kafka streams application crashes due to "UnsupportedOperationException: this should not happen: timestamp() is not supported in standby tasks."

2022-12-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642351#comment-17642351
 ] 

Matthias J. Sax commented on KAFKA-14184:
-

Thanks for the update. – It's hard to say what might have change between 2.7 
and 3.2 – it's a lot of versions in between.

Given that it's very unlikely that we would have a bug-fix release for older 
versions anyway, it might not be worth to spend time to figure it out, given 
that it seems to work as expected in 3.2.2.

> Kafka streams application crashes due to "UnsupportedOperationException: this 
> should not happen: timestamp() is not supported in standby tasks."
> 
>
> Key: KAFKA-14184
> URL: https://issues.apache.org/jira/browse/KAFKA-14184
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Suresh Rukmangathan
>Priority: Critical
>
> Kafka streams application is crashing with following stack trace with 3 
> frames from the app removed that are process/state-store related functions.
>  
> {code:java}
> java.lang.UnsupportedOperationException: this should not happen: timestamp() 
> is not supported in standby tasks.\n\n\tat 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)\n\n\tat
>  // app-calls to process & save to state store - 3 frames 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)\n"
> {code}
>  
> Key Kafka streams application configuration details are as below:-
> {code:java}
> {replication.factor=1, num.standby.replicas=1, topology.optimization=all, 
> max.request.size=1048576, auto.offset.reset=earliest}{code}
>  
> If Kafka streams replication factor = 1 and standby replicas=1, is that an 
> issue? Do we expect that the replication factor should be at least n+1, if 
> standby replicas=1 (or) there is no relationship?
>  
> Couple of more data points are:-
>  # Crash stopped once I made the standby replicas to 0.
>  # Crash also stopped once I made the number of instances (only one pod - one 
> pod has only one instance of the application running)
>  
> So, is there something that is