[jira] [Commented] (KAFKA-9877) ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed (kafka.log.LogManager)

2021-07-06 Thread Kiran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376281#comment-17376281
 ] 

Kiran commented on KAFKA-9877:
--

I am seeing the same issue in kafka1.0 as well..

 

org.apache.kafka.common.errors.KafkaStorageException: Error while deleting 
segments for testtopic-0 in dir /kafka-logs
Caused by: java.io.IOException: Delete of log .log.deleted 
failed.
 at kafka.log.LogSegment.delete(LogSegment.scala:496)
 at 
kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596)
 at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
 at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596)
 at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
 at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595)
 at 
kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599)
 at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 

I have compaction enabled for topic with below config:

segment.ms=100ms

delete.retention.ms=100ms

 

also, lot of below errros.

ERROR Error while processing data for partition testtopic1-18 
(kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.KafkaStorageException: Replica 3 is in an 
offline log directory for partition testtopic-10

 

> ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed 
> (kafka.log.LogManager)
> 
>
> Key: KAFKA-9877
> URL: https://issues.apache.org/jira/browse/KAFKA-9877
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.1.1
> Environment: Redhat
>Reporter: Hawking Du
>Priority: Major
> Attachments: server-125.log
>
>
> There is a so confused problem around me long time. 
> Kafka server often stop exceptionally seems caused by log clean process. Here 
> are some of logs from server. Can anyone give me some ideas for fixing it.
> {code:java}
> [2020-04-04 02:07:57,410] INFO [GroupMetadataManager brokerId=5] Removed 0 
> expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 02:07:57,410] INFO 
> [GroupMetadataManager brokerId=5] Removed 0 expired offsets in 0 
> milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 
> 02:17:57,410] INFO [GroupMetadataManager brokerId=5] Removed 0 expired 
> offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 02:27:57,410] INFO 
> [GroupMetadataManager brokerId=5] Removed 0 expired offsets in 0 
> milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 
> 02:30:22,272] INFO [ProducerStateManager partition=__consumer_offsets-35] 
> Writing producer snapshot at offset 741037 
> (kafka.log.ProducerStateManager)[2020-04-04 02:30:22,274] INFO [Log 
> partition=__consumer_offsets-35, dir=/tmp/kafka-logs] Rolled new log segment 
> at offset 741037 in 3 ms. (kafka.log.Log)[2020-04-04 02:30:26,289] ERROR 
> Failed to clean up log for __consumer_offsets-35 in dir /tmp/kafka-logs due 
> to IOException 
> (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-35/.log at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at 
> sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) at 
> java.nio.file.Files.move(Files.java:1395) at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:815) at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:224) at 
> kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:508) at 
> kafka.log.Log.asyncDeleteSegment(Log.scala:1962) at 
> kafka.log.Log.$anonfun$replaceSegments$6(Log.scala:2025) at 
> kafka.log.Log.$anonfun$replaceSegments$6$adapted(Log.scala:2020) at 
> 

[GitHub] [kafka] ableegoldman commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


ableegoldman commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875272278


   Merged to trunk. Thanks @showuon !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


ableegoldman merged pull request #10903:
URL: https://github.com/apache/kafka/pull/10903


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group

2021-07-06 Thread GitBox


ableegoldman opened a new pull request #10986:
URL: https://github.com/apache/kafka/pull/10986


   The `#onJoinPrepare` callback is not always invoked before a member 
(re)joins the group, but only once when it first enters the rebalance. This 
means that any updates or events that occur during the join phase can be lost 
in the internal state: for example, clearing the SubscriptionState (and thus 
the "ownedPartitions" that are used for cooperative rebalancing) after losing 
its memberId during a rebalance.
   
   We should reset the `needsJoinPrepare` flag inside the resetStateAndRejoin() 
method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665031448



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
 // this consumer is potential maxQuota candidate since we're 
still under the number of expected members
 // with more than the minQuota partitions. Note, if the number 
of expected members with more than
-// the minQuota partitions is 0, it means minQuota == 
maxQuota, so they won't be put into unfilledMembers
+// the minQuota partitions is 0, it means minQuota == 
maxQuota, and there are no potentially unfilled
 if (numMembersAssignedOverMinQuota < 
expectedNumMembersAssignedOverMinQuota) {
-unfilledMembers.add(consumer);
+potentiallyUnfilledMembersAtMinQuota.add(consumer);

Review comment:
   This is part of fix #3 -- basically we have to handle these members 
separately, once they get up to `minQuota` they are only "potentially" 
unfilled. Once the last member allowed reaches `maxQuota`, all of these 
`minQuota` members are suddenly considered filled. cc @showuon 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13039) kafka 0.10.1 gradle build failed

2021-07-06 Thread hantaoluo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hantaoluo updated KAFKA-13039:
--
Description: 
when i use gradle 3.0 or 3.5 build kafka  it failed
  
 * What went wrong:
 A problem occurred evaluating root project 'kafka-0.10.1.0-src'.
 > Could not find method scoverage() for arguments 
 > [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on 
 > project ':core' of type org.gradle.api.Project.

  was:
when i use gradle 3.0 or 3.5 build kafka  it failed
 

* What went wrong:
A problem occurred evaluating root project 'kafka-0.10.1.0-src'.
> Could not find method scoverage() for arguments 
> [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on 
> project ':core' of type org.gradle.api.Project.


> kafka 0.10.1 gradle build failed
> 
>
> Key: KAFKA-13039
> URL: https://issues.apache.org/jira/browse/KAFKA-13039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: windows10 gradle3.5 or 3.7
>Reporter: hantaoluo
>Priority: Major
>
> when i use gradle 3.0 or 3.5 build kafka  it failed
>   
>  * What went wrong:
>  A problem occurred evaluating root project 'kafka-0.10.1.0-src'.
>  > Could not find method scoverage() for arguments 
> [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on 
> project ':core' of type org.gradle.api.Project.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13039) kafka 0.10.1 gradle build failed

2021-07-06 Thread hantaoluo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hantaoluo updated KAFKA-13039:
--
Priority: Major  (was: Minor)

> kafka 0.10.1 gradle build failed
> 
>
> Key: KAFKA-13039
> URL: https://issues.apache.org/jira/browse/KAFKA-13039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: windows10 gradle3.5 or 3.7
>Reporter: hantaoluo
>Priority: Major
>
> when i use gradle 3.0 or 3.5 build kafka  it failed
>  
> * What went wrong:
> A problem occurred evaluating root project 'kafka-0.10.1.0-src'.
> > Could not find method scoverage() for arguments 
> > [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on 
> > project ':core' of type org.gradle.api.Project.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665030959



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -186,16 +212,25 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
 List assignedPartitions = new ArrayList<>();
-// Reassign previously owned partitions to the expected number
+// Reassign previously owned partitions, up to the expected number of 
partitions per consumer
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
 
+for (TopicPartition doublyClaimedPartition : 
partitionsWithMultiplePreviousOwners) {
+if (ownedPartitions.contains(doublyClaimedPartition)) {
+log.warn("Found partition {} still claimed as owned by 
consumer {}, despite being claimed by multiple"

Review comment:
   Strictly speaking this should never ever happen, even if we do get these 
"impossible" doubly-claimed partitions, we're also removing them from the 
`ownedPartitions` above (that's fix #2). But I put in a safeguard just in case, 
it shouldn't hurt (performance-wise we should generally not even enter this 
loop since `partitionsWithMultiplePreviousOwners` should almost always be empty




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wuYin commented on pull request #10977: MINOR: Reuse hasDefault instead of comparing with NO_DEFAULT_VALUE directly

2021-07-06 Thread GitBox


wuYin commented on pull request #10977:
URL: https://github.com/apache/kafka/pull/10977#issuecomment-875267298


   @showuon 
   Thank you for patient explanation, I added this.
   Now `NO_DEFAULT_VALUE` is no longer used for direct comparison.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665030173



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (final TopicPartition tp : memberData.partitions) {
 // filter out any topics that no longer exist or aren't 
part of the current subscription
 if (allTopics.contains(tp.topic())) {
-ownedPartitions.add(tp);
+
+if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+allPreviousPartitionsToOwner.put(tp, consumer);
+ownedPartitions.add(tp);
+} else {
+String otherConsumer = 
allPreviousPartitionsToOwner.get(tp);
+log.warn("Found multiple consumers {} and {} 
claiming the same TopicPartition {} in the "

Review comment:
   This is fix #2 -- if we somehow still get multiple consumers claiming a 
partition in the same generation, we have to consider both invalid and remove 
it from their `ownedPartitions`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b

2021-07-06 Thread GitBox


ableegoldman commented on a change in pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#discussion_r665029950



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 // If the current member's generation is higher, all the 
previously owned partitions are invalid
 if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
 
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+
+allPreviousPartitionsToOwner.clear();
+partitionsWithMultiplePreviousOwners.clear();
+for (String droppedOutConsumer : membersWithOldGeneration) 
{
+
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+}

Review comment:
   This part I just moved here to keep things up to date as we go, before 
we were clearing them after the loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-06 Thread GitBox


ableegoldman commented on pull request #10985:
URL: https://github.com/apache/kafka/pull/10985#issuecomment-875265727


   ready for review @dajac @guozhangwang @hachikuji  @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug

2021-07-06 Thread GitBox


ableegoldman opened a new pull request #10985:
URL: https://github.com/apache/kafka/pull/10985


   The primary goal of this PR is to address the problem we've seen in the wild 
in which the ConsumerCoordinator fails to update its SubscriptionState and 
ultimately feeds invalid `ownedPartitions` data as input to the assignor. 
Previously the assignor would detect that something was wrong and just throw an 
exception, now we make several efforts to detect this earlier in the assignment 
process and then fix it if possible, and work around it if not.
   
   Specifically, this PR does a few things:
   1) Bring the `generation` field back to the CooperativeStickyAssignor so we 
don't need to rely so heavily on the ConsumerCoordinator properly updating its 
SubscriptionState after eg falling out of the group. The plain StickyAssignor 
always used the generation since it had to, so we just make sure the 
CooperativeStickyAssignor has this tool as well
   2) In case of unforeseen problems or further bugs that slip past the 
`generation` field safety net, the assignor will now explicitly look out for 
partitions that are being claimed by multiple consumers as owned in the same 
generation. Such a case should never occur, but if it does, we have to 
invalidate this partition from the `ownedPartitions` of both consumers, since 
we can't tell who, if anyone, has the valid claim to this partition.
   3) Fix a subtle bug that I discovered while writing tests for the above two 
fixes: in the constrained algorithm, we compute the exact number of partitions 
each consumer should end up with, and keep track of the "unfilled" members who 
must -- or _might_ -- require more partitions to hit their quota. The problem 
was that members at the `minQuota` were being considered as "unfilled" even 
after we had already hit the maximum number of consumers allowed to go up to 
the `maxQuota`, meaning those `minQuota` members could/should not accept any 
more partitions beyond that. I believe this was introduced in 
[#10509](https://github.com/apache/kafka/pull/10509), so it shouldn't be in any 
released versions and does not need to be backported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


showuon commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875264149


   @ableegoldman , failed tests are unrelated. Thank you.
   
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10983: mention IdentityReplicationPolicy in ops docs

2021-07-06 Thread GitBox


showuon commented on a change in pull request #10983:
URL: https://github.com/apache/kafka/pull/10983#discussion_r665019951



##
File path: docs/ops.html
##
@@ -845,6 +845,11 @@ https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java;>IdentityReplicationPolicy
 (since version 3.0.0), which does not rename topics. This is useful for simple 
one-way replication topologies, where topic renaming is not strictly necessary; 
however, be careful not to introduce cycles when using 
IdentityReplicationPolicy, since this can result in the same 
records being replicated in an infinite loop.

Review comment:
   I'm not sure if we really need to put the source code link here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376189#comment-17376189
 ] 

Luke Chen edited comment on KAFKA-13008 at 7/7/21, 3:30 AM:


Actually, I'm not very familiar with detailed incremental session, too. But 
from the , 
[KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]:
 we can see the old session will be evict only when matching 1 of the following 
3 condition:

!image-2021-07-07-11-19-55-630.png|width=762,height=161!

 

And because in the step 5, all above 3 conditions won't match, the new session 
won't evict the old session. Also, during that time, the old session already 
contain the "up-to-date" partition info of the partition A-1 (because partition 
A-1 was assigned to this session), so no partition A-1 update will be received.

 

This is my understanding. Please correct me if I'm wrong. Thank you.


was (Author: showuon):
Actually, I'm not very familiar with detailed incremental session, too. But 
from the , 
[KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]:
 we can see the old session will be evict only when matching 1 of the following 
3 condition:

!image-2021-07-07-11-19-55-630.png|width=762,height=161!

 

And because in the step 5, all above 3 conditions won't match, the new session 
won't evict the old session. Also, during that time, the old session already 
contain the "up-to-date" partition info of the partition A-1, so no partition 
A-1 update will be received.

 

This is my understanding. Please correct me if I'm wrong. Thank you.

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Major
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch 

[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376189#comment-17376189
 ] 

Luke Chen commented on KAFKA-13008:
---

Actually, I'm not very familiar with detailed incremental session, too. But 
from the , 
[KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]:
 we can see the old session will be evict only when matching 1 of the following 
3 condition:

!image-2021-07-07-11-19-55-630.png|width=762,height=161!

 

And because in the step 5, all above 3 conditions won't match, the new session 
won't evict the old session. Also, during that time, the old session already 
contain the "up-to-date" partition info of the partition A-1, so no partition 
A-1 update will be received.

 

This is my understanding. Please correct me if I'm wrong. Thank you.

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Major
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and 

[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-06 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13008:
--
Attachment: image-2021-07-07-11-19-55-630.png

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Major
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ccding opened a new pull request #10984: MINOR: fix typoe in LogCleanerTest.scala

2021-07-06 Thread GitBox


ccding opened a new pull request #10984:
URL: https://github.com/apache/kafka/pull/10984


   `are rename to` -> `are renamed to`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-07-06 Thread GitBox


kamalcph commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-875230369


   > Hi @kamalcph , what's the status of this PR? I noticed there are some 
unresolved code review comments, and there are some conflicts.
   
   I'll address the pending review comments and resolve the conflicts. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10977: MINOR: Reuse hasDefault instead of comparing with NO_DEFAULT_VALUE directly

2021-07-06 Thread GitBox


showuon commented on pull request #10977:
URL: https://github.com/apache/kafka/pull/10977#issuecomment-875220116


   @wuYin 
   
   > This small change is mainly to unify the way to determine if the user has 
set the default value
   
   Yes, I know, so I think this line should also be updated.
   
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L1146
   
   ```
   NO_DEFAULT_VALUE.equals(defaultValue) ? NO_DEFAULT_VALUE : parseType(name, 
defaultValue, type);
   ```
   It is also trying to determine if the default value is set, so, we can 
update to:
   ```
   hasDefault() ?  parseType(name, defaultValue, type) : NO_DEFAULT_VALUE;
   ```
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


showuon commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875214820


   @ableegoldman , I've added a test in `ConsumerConfigTest` to verify the 
default partition assignor. We cannot verify the assignor via `KafkaConsumer` 
or `ConsumerCoordinator` unless we add/update methods for testing. I think 
that's unnecessary. Let me know if you have other opinions. Thank you. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-07-06 Thread GitBox


dengziming commented on pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#issuecomment-875209647


   Replace deprecated methods and rebase on trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-06 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r664983511



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -51,6 +58,7 @@
  */
 public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
 private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+private static final long INITIALIZATION_RETRY_INTERVAL_MS = 3L;

Review comment:
   I prefer adding L for longs, which I missed at other declaration. afaik, 
that does not cause any issues as it gets automatically converted via a 
widening conversion to a `long`. The compiler takes care of not allowing 
numbers that may get truncated from `int` to `long` widening. Thanks for 
catching it, I will make it consistent by adding it.  
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-06 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r664980913



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -220,16 +227,16 @@ public boolean isPartitionAssigned(int partition) {
 return assignedMetaPartitions.contains(partition);
 }
 
-private void ensureNotClosed() {
-if (closing) {
-throw new IllegalStateException("This instance is already closed");
-}
-}
-
 public void close() {
 if (!closing) {
-closing = true;
-consumer.wakeup();
+synchronized (assignPartitionsLock) {
+// Closing should be updated only after acquiring the lock to 
avoid race in
+// maybeWaitForPartitionsAssignment() where it waits on 
assignPartitionsLock. It should not wait
+// if the closing is already set.
+closing = true;

Review comment:
   `close()` will not be called concurrently here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-06 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r664976317



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that are deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * 
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+private static final long POLL_INTERVAL_MS = 100;
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaConsumer consumer;
+private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+// It indicates whether the closing process has been started or not. If it 
is set as true,
+// consumer will stop consuming messages and it will not allow partition 
assignments to be updated.
+private volatile boolean closing = false;
+// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
+// determined that the consumer needs to be assigned with the updated 
partitions.
+private volatile boolean assignPartitions = false;
+
+private final Object assignPartitionsLock = new Object();
+
+// Remote log metadata topic partitions that consumer is assigned to.
+private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+
+// User topic partitions that this broker is a leader/follower for.
+private Set assignedTopicPartitions = 
Collections.emptySet();
+
+// Map of remote log metadata 

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-06 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r664970217



##
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+
+/**
+ * This interface defines the lifecycle methods for {@code 
RemoteLogSegmentMetadata}. {@link RemoteLogSegmentLifecycleTest} tests
+ * different implementations of this interface. This is responsible for 
managing all the segments for a given {@code topicIdPartition}
+ * registered with {@link #initialize(TopicIdPartition)}.
+ */
+public interface RemoteLogSegmentLifecycleManager extends Closeable {

Review comment:
   `RemoteLogSegmentLifecycleManager` is already under tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


showuon commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875185428


   Make sense! Will add later. Thanks for the suggestion :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


ableegoldman commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875181165


   > So, in this PR, I've added 1 test to test it: 
testMultiConsumerDefaultAssignorAndVerifyAssignment
   
   Thanks Luke, but I actually meant something simpler (and stupider) that 
directly verifies that the chosen assignor is the RangeAssignor, rather than 
inferring it from the assignment that's produced. Like something in 
ConsumerConfigTest, or KafkaConsumerTest. Does that make sense? I just think we 
want to have both a test: a more complicated test that validates what we 
actually care about, the resulting assignment, but also a plain dumb test that 
we can be 100% sure will break if/when the default assignor is changed, no 
matter what it gets changed to (ie even if the new assignor happens to produce 
the same assignment as the RangeAssignor in some cases)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10982: MINOR: Update dropwizard metrics to 4.1.12.1

2021-07-06 Thread GitBox


rondagostino commented on pull request #10982:
URL: https://github.com/apache/kafka/pull/10982#issuecomment-875181066


   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-07-06--001
   run time: 177 minutes 4.952 seconds
   tests run:876
   passed:   667
   failed:   12
   ignored:  197
   

   ```
   
   
https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-07-06--001.1625614045--rondagostino--minor_dropwizard_metrics_4.1.12.1--362a8b250/report.html
   
   Failures appear unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2021-07-06 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376128#comment-17376128
 ] 

Kirk True edited comment on KAFKA-12879 at 7/7/21, 12:24 AM:
-

Almost all of the {{Admin}} client's public API methods are meant to be 
consumed asynchronously. Most of the exceptions that can occur are thrown when 
the {{Future#get}} method is invoked, not when the {{Admin}} API call itself is 
invoked.

However, there are some exceptions (pun intended):
 # Creation of the client propagates errors if any occur during configuration
 # {{close}} throws an error if the given {{timeout}} is negative
 # {{deleteTopics}} throws an {{IllegalArgumentException}} if the 
{{TopicCollection}} parameter is not of an expected sub-class
 # {{updateFeatures}} throws an {{IllegalArgumentException}} if the the 
{{featureUpdates}} map is empty or contains a blank feature name

The above are just those that are thrown directly in the {{KafkaAdminClient}} 
itself.

Of the above list, item four seems like it stands out as being extra "picky" 
compared to, say, {{describeTransactions}}, which doesn't check the collection 
its given for emptiness.


was (Author: kirktrue):
Almost all of the {{Admin}} client's public API methods are meant to be 
consumed asynchronously. Most of the exceptions that can occur are thrown when 
the {{Future#get}} method is invoked.

However, there are some exceptions (pun intended):
 # Creation of the client propagates errors if any occur during configuration
 # {{close}} throws an error if the given {{timeout}} is negative
 # {{deleteTopics}} throws an {{IllegalArgumentException}} if the 
{{TopicCollection}} parameter is not of an expected sub-class
 # {{updateFeatures}} throws an {{IllegalArgumentException}} if the the 
{{featureUpdates}} map is empty or contains a blank feature name

The above are just those that are thrown directly in the {{KafkaAdminClient}} 
itself.

Of the above list, item four seems like it stands out as being extra "picky" 
compared to, say, {{describeTransactions}}, which doesn't check the collection 
its given for emptiness.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Kirk True
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2021-07-06 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376128#comment-17376128
 ] 

Kirk True commented on KAFKA-12879:
---

Almost all of the {{Admin}} client's public API methods are meant to be 
consumed asynchronously. Most of the exceptions that can occur are thrown when 
the {{Future#get}} method is invoked.

However, there are some exceptions (pun intended):
 # Creation of the client propagates errors if any occur during configuration
 # {{close}} throws an error if the given {{timeout}} is negative
 # {{deleteTopics}} throws an {{IllegalArgumentException}} if the 
{{TopicCollection}} parameter is not of an expected sub-class
 # {{updateFeatures}} throws an {{IllegalArgumentException}} if the the 
{{featureUpdates}} map is empty or contains a blank feature name

The above are just those that are thrown directly in the {{KafkaAdminClient}} 
itself.

Of the above list, item four seems like it stands out as being extra "picky" 
compared to, say, {{describeTransactions}}, which doesn't check the collection 
its given for emptiness.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Kirk True
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


showuon commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875165545


   @ableegoldman , no, there's no tests to verify that the RangeAssignor will 
be chosen by default when creating a Consumer. So, in this PR, I've added 1 
test to test it: `testMultiConsumerDefaultAssignorAndVerifyAssignment`. Thank 
you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0

2021-07-06 Thread GitBox


showuon commented on pull request #10903:
URL: https://github.com/apache/kafka/pull/10903#issuecomment-875157335


   @ableegoldman , as discussed, we agreed this should be put in v3.0. Please 
take a look when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-07-06 Thread GitBox


cmccabe merged pull request #10931:
URL: https://github.com/apache/kafka/pull/10931


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes

2021-07-06 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista updated KAFKA-13040:
-
Description: 
Raised for KIP-760 (linked).

Many times, Kafka brokers in production crash with "Too many open files" error 
or "Out of memory" errors because some Kafka topics have a lot of segment files 
as a result of small {{segment.ms}} or {{segment.bytes}}. These two 
configuration can be set by any user who is authorized to create topic or 
modify topic configuration.

To prevent these two configuration from causing Kafka broker crash, they should 
have a minimum value that is big enough.

  was:
Many times, Kafka brokers in production crash with "Too many open files" error 
or "Out of memory" errors because some Kafka topics have a lot of segment files 
as a result of small {{segment.ms}} or {{segment.bytes}}. These two 
configuration can be set by any user who is authorized to create topic or 
modify topic configuration.

To prevent these two configuration from causing Kafka broker crash, they should 
have a minimum value that is big enough.


> Increase minimum value of segment.ms and segment.bytes
> --
>
> Key: KAFKA-13040
> URL: https://issues.apache.org/jira/browse/KAFKA-13040
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Badai Aqrandista
>Priority: Minor
>
> Raised for KIP-760 (linked).
> Many times, Kafka brokers in production crash with "Too many open files" 
> error or "Out of memory" errors because some Kafka topics have a lot of 
> segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These 
> two configuration can be set by any user who is authorized to create topic or 
> modify topic configuration.
> To prevent these two configuration from causing Kafka broker crash, they 
> should have a minimum value that is big enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes

2021-07-06 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista reassigned KAFKA-13040:


Assignee: Badai Aqrandista

> Increase minimum value of segment.ms and segment.bytes
> --
>
> Key: KAFKA-13040
> URL: https://issues.apache.org/jira/browse/KAFKA-13040
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Badai Aqrandista
>Priority: Minor
>
> Many times, Kafka brokers in production crash with "Too many open files" 
> error or "Out of memory" errors because some Kafka topics have a lot of 
> segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These 
> two configuration can be set by any user who is authorized to create topic or 
> modify topic configuration.
> To prevent these two configuration from causing Kafka broker crash, they 
> should have a minimum value that is big enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 edited a comment on pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 edited a comment on pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#issuecomment-875128420


   > @skaundinya15 Thanks for the updates. Left comments about indexing of 
`groups` (starts from 0) in the tests with the change to collection in the last 
commit. Apart from that LGTM if tests pass.
   
   @rajinisivaram Just pushed changes for this to fix the failing tests - 
thanks for pointing it out
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13041) Support debugging system tests with ducker-ak

2021-07-06 Thread Stanislav Vodetskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Vodetskyi updated KAFKA-13041:

Summary: Support debugging system tests with ducker-ak  (was: Support 
debugging system tests)

> Support debugging system tests with ducker-ak
> -
>
> Key: KAFKA-13041
> URL: https://issues.apache.org/jira/browse/KAFKA-13041
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Stanislav Vodetskyi
>Priority: Major
>
> Currently when you're using ducker-ak to run system tests locally



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13041) Support debugging system tests with ducker-ak

2021-07-06 Thread Stanislav Vodetskyi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376104#comment-17376104
 ] 

Stanislav Vodetskyi commented on KAFKA-13041:
-

https://github.com/apache/kafka/pull/10915

> Support debugging system tests with ducker-ak
> -
>
> Key: KAFKA-13041
> URL: https://issues.apache.org/jira/browse/KAFKA-13041
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Stanislav Vodetskyi
>Priority: Major
>
> Currently when you're using ducker-ak to run system tests locally, your only 
> debug option is to add print/log messages.
> It should be possible to connect to a ducker-ak test with a remote debugger.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13041) Support debugging system tests with ducker-ak

2021-07-06 Thread Stanislav Vodetskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Vodetskyi updated KAFKA-13041:

Description: 
Currently when you're using ducker-ak to run system tests locally, your only 
debug option is to add print/log messages.
It should be possible to connect to a ducker-ak test with a remote debugger.

  was:Currently when you're using ducker-ak to run system tests locally


> Support debugging system tests with ducker-ak
> -
>
> Key: KAFKA-13041
> URL: https://issues.apache.org/jira/browse/KAFKA-13041
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Stanislav Vodetskyi
>Priority: Major
>
> Currently when you're using ducker-ak to run system tests locally, your only 
> debug option is to add print/log messages.
> It should be possible to connect to a ducker-ak test with a remote debugger.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stan-confluent commented on pull request #10915: Enable connecting VS Code remote debugger

2021-07-06 Thread GitBox


stan-confluent commented on pull request #10915:
URL: https://github.com/apache/kafka/pull/10915#issuecomment-875132069


   @omkreddy https://issues.apache.org/jira/browse/KAFKA-13041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13041) Support debugging system tests

2021-07-06 Thread Stanislav Vodetskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Vodetskyi updated KAFKA-13041:

Description: Currently when you're using ducker-ak to run system tests 
locally

> Support debugging system tests
> --
>
> Key: KAFKA-13041
> URL: https://issues.apache.org/jira/browse/KAFKA-13041
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Stanislav Vodetskyi
>Priority: Major
>
> Currently when you're using ducker-ak to run system tests locally



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13041) Support debugging system tests

2021-07-06 Thread Stanislav Vodetskyi (Jira)
Stanislav Vodetskyi created KAFKA-13041:
---

 Summary: Support debugging system tests
 Key: KAFKA-13041
 URL: https://issues.apache.org/jira/browse/KAFKA-13041
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Stanislav Vodetskyi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 commented on pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#issuecomment-875128420


   > @skaundinya15 Thanks for the updates. Left comments about indexing of 
`groups` (starts from 0) in the tests with the change to collection in the last 
commit. Apart from that LGTM if tests pass.
   Just pushed changes for this to fix the failing tests - thanks for pointing 
it out
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664918688



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1358,17 +1367,222 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
 // note there's only one broker, so no need to lookup the group coordinator
 
 // without describe permission on the topic, we shouldn't be able to fetch 
offsets
-val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, 
false, null, false).build()
+val offsetFetchRequest = createOffsetFetchRequestAllPartitions
 var offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-assertEquals(Errors.NONE, offsetFetchResponse.error)
-assertTrue(offsetFetchResponse.responseData.isEmpty)
+assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty)
 
 // now add describe permission on the topic and verify that the offset can 
be fetched
 addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WildcardHost, DESCRIBE, ALLOW)), topicResource)
 offsetFetchResponse = 
connectAndReceive[OffsetFetchResponse](offsetFetchRequest)
-assertEquals(Errors.NONE, offsetFetchResponse.error)
-assertTrue(offsetFetchResponse.responseData.containsKey(tp))
-assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
+assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group))
+assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp))
+assertEquals(offset, 
offsetFetchResponse.partitionDataMap(group).get(tp).offset)
+  }
+
+  @Test
+  def testOffsetFetchMultipleGroupsAuthorization(): Unit = {
+val groups = (0 until 5).map(i => s"group$i")
+val groupResources = groups.map(group => new ResourcePattern(GROUP, group, 
LITERAL))
+
+val topic1 = "topic1"
+val topic1List = singletonList(new TopicPartition(topic1, 0))
+val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL)
+val topic2 = "topic2"
+val topic1And2List = util.Arrays.asList(
+  new TopicPartition(topic1, 0),
+  new TopicPartition(topic2, 0),
+  new TopicPartition(topic2, 1))
+val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL)
+val topic3 = "topic3"
+val allTopicsList = util.Arrays.asList(
+  new TopicPartition(topic1, 0),
+  new TopicPartition(topic2, 0),
+  new TopicPartition(topic2, 1),
+  new TopicPartition(topic3, 0),
+  new TopicPartition(topic3, 1),
+  new TopicPartition(topic3, 2))
+val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL)
+
+// create group to partition map to build batched offsetFetch request
+val groupToPartitionMap = new util.HashMap[String, 
util.List[TopicPartition]]()
+groupToPartitionMap.put(groups(1), topic1List)

Review comment:
   groups(0) to group(4) 

##
File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
##
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
+import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+import java.util
+import java.util.Collections.singletonList
+import scala.jdk.CollectionConverters._
+import java.util.{Optional, Properties}
+
+class OffsetFetchRequestTest extends BaseRequestTest {
+
+  override def brokerCount: Int = 1
+
+  val brokerId: Integer = 0
+  val offset = 15L
+  val leaderEpoch: Optional[Integer] = Optional.of(3)
+  val metadata = "metadata"
+  val topic = "topic"
+  val groupId = "groupId"
+  val groups: Seq[String] = (0 until 5).map(i => 

[GitHub] [kafka] junrao commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-06 Thread GitBox


junrao commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r664866782



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that are deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * 
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+private static final long POLL_INTERVAL_MS = 100;
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaConsumer consumer;
+private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+// It indicates whether the closing process has been started or not. If it 
is set as true,
+// consumer will stop consuming messages and it will not allow partition 
assignments to be updated.
+private volatile boolean closing = false;
+// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
+// determined that the consumer needs to be assigned with the updated 
partitions.
+private volatile boolean assignPartitions = false;
+
+private final Object assignPartitionsLock = new Object();
+
+// Remote log metadata topic partitions that consumer is assigned to.
+private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+
+// User topic partitions that this broker is a leader/follower for.
+private Set assignedTopicPartitions = 
Collections.emptySet();
+
+// Map of remote log metadata 

[jira] [Commented] (KAFKA-13037) "Thread state is already PENDING_SHUTDOWN" log spam

2021-07-06 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376066#comment-17376066
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13037:


Hey [~gray.john], would you be interested in submitting a PR for this? I 
completely agree that logging this at INFO on every iteration is wildly 
inappropriate, I just didn't push it at the time since I figured someone would 
file a ticket if it was really bothering people. And here we are :) 

> "Thread state is already PENDING_SHUTDOWN" log spam
> ---
>
> Key: KAFKA-13037
> URL: https://issues.apache.org/jira/browse/KAFKA-13037
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: John Gray
>Priority: Major
>
> KAFKA-12462 introduced a 
> [change|https://github.com/apache/kafka/commit/4fe4cdc4a61cbac8e070a8b5514403235194015b#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R722]
>  that increased this "Thread state is already {}" logger to info instead of 
> debug. We are running into a problem with our streams apps when they hit an 
> unrecoverable exception that shuts down the streams thread, we get this log 
> printed about 50,000 times per second per thread. I am guessing it is once 
> per record we have queued up when the exception happens? We have temporarily 
> raised the StreamThread logger to WARN instead of INFO to avoid the spam, but 
> we do miss the other good logs we get on INFO in that class. Could this log 
> be reverted back to debug? Thank you! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664895401



##
File path: clients/src/main/resources/common/message/OffsetFetchRequest.json
##
@@ -31,19 +31,33 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is adding the require stable flag.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups at a 
time
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
-{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": 
"groupId",
   "about": "The group to fetch offsets for." },
-{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", 
"nullableVersions": "2+",
+{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": 
"0-7", "nullableVersions": "2-7",
   "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
 "about": "The topic name."},
-  { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+  { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
 "about": "The partition indexes we would like to fetch offsets for." }
 ]},
+{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": 
"8+",

Review comment:
   Sounds good, will change it.

##
File path: clients/src/main/resources/common/message/OffsetFetchResponse.json
##
@@ -30,30 +30,57 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 adds pending offset commit as new error response on partition 
level.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
 { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
   "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0+", 
+{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0-7",
   "about": "The responses per topic.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
 "about": "The topic name." },
-  { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0+",
+  { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0-7",
 "about": "The responses per partition", "fields": [
-{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+{ "name": "PartitionIndex", "type": "int32", "versions": "0-7",
   "about": "The partition index." },
-{ "name": "CommittedOffset", "type": "int64", "versions": "0+",
+{ "name": "CommittedOffset", "type": "int64", "versions": "0-7",
   "about": "The committed message offset." },
-{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", 
"default": "-1",
+{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", 
"default": "-1",
   "ignorable": true, "about": "The leader epoch." },
-{ "name": "Metadata", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
+{ "name": "Metadata", "type": "string", "versions": "0-7", 
"nullableVersions": "0-7",
   "about": "The partition metadata." },
-{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+{ "name": "ErrorCode", "type": "int16", "versions": "0-7",
   "about": "The error code, or 0 if there was no error." }
   ]}
 ]},
-{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": true,
-  "about": "The top-level error code, or 0 if there was no error." }
+{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", 
"ignorable": true,
+  "about": "The top-level error code, or 0 if there was no error." },
+{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": 
"8+",

Review comment:
   Sounds good, will change it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #10981: Bump trunk to 3.1.0-SNAPSHOT

2021-07-06 Thread GitBox


kkonstantine merged pull request #10981:
URL: https://github.com/apache/kafka/pull/10981


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #10981: Bump trunk to 3.1.0-SNAPSHOT

2021-07-06 Thread GitBox


kkonstantine commented on pull request #10981:
URL: https://github.com/apache/kafka/pull/10981#issuecomment-875092481


   The few failures across builds are not relevant to the changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms

2021-07-06 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376037#comment-17376037
 ] 

Badai Aqrandista commented on KAFKA-7760:
-

[~dulvinw] I have created 
[KIP-760|https://cwiki.apache.org/confluence/display/KAFKA/KIP-760%3A+Increase+minimum+value+of+segment.ms+and+segment.bytes]
 and created KAFKA-13040 for it.

I will close this ticket to focus on the work in KIP-760.

> Add broker configuration to set minimum value for segment.bytes and segment.ms
> --
>
> Key: KAFKA-7760
> URL: https://issues.apache.org/jira/browse/KAFKA-7760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Dulvin Witharane
>Priority: Major
>  Labels: kip, newbie
>
> If someone set segment.bytes or segment.ms at topic level to a very small 
> value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a 
> very high number of segment files. This can bring down the whole broker due 
> to hitting the maximum open file (for log) or maximum number of mmap-ed file 
> (for index).
> To prevent that from happening, I would like to suggest adding two new items 
> to the broker configuration:
>  * min.topic.segment.bytes, defaults to 1048576: The minimum value for 
> segment.bytes. When someone sets topic configuration segment.bytes to a value 
> lower than this, Kafka throws an error INVALID VALUE.
>  * min.topic.segment.ms, defaults to 360: The minimum value for 
> segment.ms. When someone sets topic configuration segment.ms to a value lower 
> than this, Kafka throws an error INVALID VALUE.
> Thanks
> Badai



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms

2021-07-06 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista resolved KAFKA-7760.
-
Resolution: Duplicate

> Add broker configuration to set minimum value for segment.bytes and segment.ms
> --
>
> Key: KAFKA-7760
> URL: https://issues.apache.org/jira/browse/KAFKA-7760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Dulvin Witharane
>Priority: Major
>  Labels: kip, newbie
>
> If someone set segment.bytes or segment.ms at topic level to a very small 
> value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a 
> very high number of segment files. This can bring down the whole broker due 
> to hitting the maximum open file (for log) or maximum number of mmap-ed file 
> (for index).
> To prevent that from happening, I would like to suggest adding two new items 
> to the broker configuration:
>  * min.topic.segment.bytes, defaults to 1048576: The minimum value for 
> segment.bytes. When someone sets topic configuration segment.bytes to a value 
> lower than this, Kafka throws an error INVALID VALUE.
>  * min.topic.segment.ms, defaults to 360: The minimum value for 
> segment.ms. When someone sets topic configuration segment.ms to a value lower 
> than this, Kafka throws an error INVALID VALUE.
> Thanks
> Badai



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe edited a comment on pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-07-06 Thread GitBox


cmccabe edited a comment on pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#issuecomment-875089636


   Merged to trunk and 3.0.
   
   Thanks, @mumrah.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-07-06 Thread GitBox


cmccabe closed pull request #10864:
URL: https://github.com/apache/kafka/pull/10864


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-07-06 Thread GitBox


cmccabe commented on pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#issuecomment-875089636


   Merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664880346



##
File path: clients/src/main/resources/common/message/OffsetFetchResponse.json
##
@@ -30,30 +30,57 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 adds pending offset commit as new error response on partition 
level.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
 { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
   "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0+", 
+{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": 
"0-7",
   "about": "The responses per topic.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
 "about": "The topic name." },
-  { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0+",
+  { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", 
"versions": "0-7",
 "about": "The responses per partition", "fields": [
-{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+{ "name": "PartitionIndex", "type": "int32", "versions": "0-7",
   "about": "The partition index." },
-{ "name": "CommittedOffset", "type": "int64", "versions": "0+",
+{ "name": "CommittedOffset", "type": "int64", "versions": "0-7",
   "about": "The committed message offset." },
-{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", 
"default": "-1",
+{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", 
"default": "-1",
   "ignorable": true, "about": "The leader epoch." },
-{ "name": "Metadata", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
+{ "name": "Metadata", "type": "string", "versions": "0-7", 
"nullableVersions": "0-7",
   "about": "The partition metadata." },
-{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+{ "name": "ErrorCode", "type": "int16", "versions": "0-7",
   "about": "The error code, or 0 if there was no error." }
   ]}
 ]},
-{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": true,
-  "about": "The top-level error code, or 0 if there was no error." }
+{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", 
"ignorable": true,
+  "about": "The top-level error code, or 0 if there was no error." },
+{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": 
"8+",

Review comment:
   As with the response, should we call this `Groups` rather than 
`GroupIds`?

##
File path: clients/src/main/resources/common/message/OffsetFetchRequest.json
##
@@ -31,19 +31,33 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is adding the require stable flag.
-  "validVersions": "0-7",
+  //
+  // Version 8 is adding support for fetching offsets for multiple groups at a 
time
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
-{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": 
"groupId",
   "about": "The group to fetch offsets for." },
-{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", 
"nullableVersions": "2+",
+{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": 
"0-7", "nullableVersions": "2-7",
   "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0-7", "entityType": 
"topicName",
 "about": "The topic name."},
-  { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
+  { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
 "about": "The partition indexes we would like to fetch offsets for." }
 ]},
+{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": 
"8+",

Review comment:
   Should we call this `Groups` rather than `GroupIds` since it is not just 
the group id?

##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -1358,17 +1367,233 @@ class AuthorizerIntegrationTest extends 
BaseRequestTest {
 // note there's 

[jira] [Created] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes

2021-07-06 Thread Badai Aqrandista (Jira)
Badai Aqrandista created KAFKA-13040:


 Summary: Increase minimum value of segment.ms and segment.bytes
 Key: KAFKA-13040
 URL: https://issues.apache.org/jira/browse/KAFKA-13040
 Project: Kafka
  Issue Type: Improvement
Reporter: Badai Aqrandista


Many times, Kafka brokers in production crash with "Too many open files" error 
or "Out of memory" errors because some Kafka topics have a lot of segment files 
as a result of small {{segment.ms}} or {{segment.bytes}}. These two 
configuration can be set by any user who is authorized to create topic or 
modify topic configuration.

To prevent these two configuration from causing Kafka broker crash, they should 
have a minimum value that is big enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13000) Improve handling of UnsupportedVersionException in MockClient

2021-07-06 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376022#comment-17376022
 ] 

Kirk True commented on KAFKA-13000:
---

[~mimaison] - is this ticket still valid? Looking at the code in {{trunk}}, I 
don't see anywhere in {{MockClient}} where an {{UnsupportedVersionException}} 
is thrown. I've done some related work in 
[KAFKA-12989|https://issues.apache.org/jira/browse/KAFKA-12989], so I'm hoping 
to work on this too, if it's still an issue. Thanks!

> Improve handling of UnsupportedVersionException in MockClient
> -
>
> Key: KAFKA-13000
> URL: https://issues.apache.org/jira/browse/KAFKA-13000
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Kirk True
>Priority: Major
>
> MockClient handles UnsupportedVersionException slightly differently than 
> NetworkClient. In some cases, it may throw this exception while instead it 
> should return always return a ClientResponse.
> Background: https://github.com/apache/kafka/pull/10743#discussion_r655922760



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664873858



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   You can probably set the list in `OffsetFetchRequestData` directly in 
the test. But let's leave that for a follow-on PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13000) Improve handling of UnsupportedVersionException in MockClient

2021-07-06 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-13000:
-

Assignee: Kirk True

> Improve handling of UnsupportedVersionException in MockClient
> -
>
> Key: KAFKA-13000
> URL: https://issues.apache.org/jira/browse/KAFKA-13000
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Kirk True
>Priority: Major
>
> MockClient handles UnsupportedVersionException slightly differently than 
> NetworkClient. In some cases, it may throw this exception while instead it 
> should return always return a ClientResponse.
> Background: https://github.com/apache/kafka/pull/10743#discussion_r655922760



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-07-06 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r664870271



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -316,6 +316,11 @@ public void flush() {
 lastFlushedOffset = endOffset().offset;
 }
 
+@Override
+public boolean maybeClean() {
+return false;
+}

Review comment:
   I'm going to address this further in a follow-up PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-06 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376019#comment-17376019
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13008:


Nice find! I agree, this does not seem like the expected behavior, and given 
that it's been causing a test to fail regularly I think we can assert that this 
should not happen. 

One thing I don't understand, and maybe this is because I don't have much 
context on the incremental fetch internals, is: why would we not get the 
metadata again after the partition is re-assigned in step 5? Surely if a 
partition was removed from the assignment and then added back, this should 
constitute a new 'session', and thus it should get the metadata again on 
assignment? If not that sounds like a bug in the incremental fetch to me, but 
again, I'm not too familiar with it so there could be a valid reason it works 
this way. 

If so, then maybe we should consider allowing metadata to remain around after a 
partition is unassigned, in case it gets this same partition back within the 
session? Could there be other consequences of this lack of metadata, outside of 
Streams?

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Major
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or 

[GitHub] [kafka] kirktrue commented on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-06 Thread GitBox


kirktrue commented on pull request #10980:
URL: https://github.com/apache/kafka/pull/10980#issuecomment-875058699


   @hachikuji - would you be willing to assign a reviewer for this PR?
   
   The failing tests look like they're related to KRaft tests, so I don't 
_think_ they're related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #10951: KAFKA-12841: NPE from the provided metadata in client callback in case of ApiException

2021-07-06 Thread GitBox


kirktrue commented on pull request #10951:
URL: https://github.com/apache/kafka/pull/10951#issuecomment-875058547


   @hachikuji - would you be willing to assign a reviewer for this PR?
   
   The failing tests look like they're related to KRaft tests, so I don't 
_think_ they're related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #10943: Fix verification of version probing

2021-07-06 Thread GitBox


ableegoldman edited a comment on pull request #10943:
URL: https://github.com/apache/kafka/pull/10943#issuecomment-875052948


   Oh nevermind, I see, I just missed bumping one instance of it in the 
message. In that case this LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10943: Fix verification of version probing

2021-07-06 Thread GitBox


ableegoldman commented on pull request #10943:
URL: https://github.com/apache/kafka/pull/10943#issuecomment-875052948


   Oh nevermind, I see the problem, I just missed bumping one instance of it in 
the message. In that case this LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10943: Fix verification of version probing

2021-07-06 Thread GitBox


ableegoldman commented on pull request #10943:
URL: https://github.com/apache/kafka/pull/10943#issuecomment-875051446


   Wait, I'm confused. I definitely bumped the version number in this test in 
that PR you referenced (scroll down to the end of the changes, it's the last 
file). Did we for some reason bump the protocol version number again since then?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException

2021-07-06 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-12644:

Summary: Add Missing Class-Level Javadoc to Descendants of 
org.apache.kafka.common.errors.ApiException  (was: Add Missing Class-Level 
Javadoc to Decendants of org.apache.kafka.common.errors.ApiException)

> Add Missing Class-Level Javadoc to Descendants of 
> org.apache.kafka.common.errors.ApiException
> -
>
> Key: KAFKA-12644
> URL: https://issues.apache.org/jira/browse/KAFKA-12644
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, documentation
>Affects Versions: 3.0.0, 2.8.1
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: documentation
> Fix For: 3.0.0, 2.8.1
>
>
> I noticed that class-level Javadocs are missing from some classes in the 
> org.apache.kafka.common.errors package. This issue is for tracking the work 
> of adding the missing class-level javadocs for those Exception classes.
> https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html
> https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors
> Basic class-level documentation could be derived by mapping the error 
> conditions documented in the protocol
> https://kafka.apache.org/protocol#protocol_constants



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-07-06 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-12994:

Priority: Blocker  (was: Major)

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: kip, kip-633
> Fix For: 3.0.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13021) Improve Javadocs for API Changes from KIP-633

2021-07-06 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-13021:

Priority: Blocker  (was: Minor)

> Improve Javadocs for API Changes from KIP-633
> -
>
> Key: KAFKA-13021
> URL: https://issues.apache.org/jira/browse/KAFKA-13021
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Blocker
>
> There are Javadoc changes from the following PR that needs to be completed 
> prior to the 3.0 release. This Jira item is to track that work
> [https://github.com/apache/kafka/pull/10926]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)

2021-07-06 Thread GitBox


cmccabe commented on pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#issuecomment-875047031


   Sorry, I forgot to push earlier. Also fixed an additional issue with default 
quotas.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Decendants of org.apache.kafka.common.errors.ApiException

2021-07-06 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-12644:

Priority: Blocker  (was: Major)

> Add Missing Class-Level Javadoc to Decendants of 
> org.apache.kafka.common.errors.ApiException
> 
>
> Key: KAFKA-12644
> URL: https://issues.apache.org/jira/browse/KAFKA-12644
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, documentation
>Affects Versions: 3.0.0, 2.8.1
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: documentation
> Fix For: 3.0.0, 2.8.1
>
>
> I noticed that class-level Javadocs are missing from some classes in the 
> org.apache.kafka.common.errors package. This issue is for tracking the work 
> of adding the missing class-level javadocs for those Exception classes.
> https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html
> https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors
> Basic class-level documentation could be derived by mapping the error 
> conditions documented in the protocol
> https://kafka.apache.org/protocol#protocol_constants



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12458) Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob Storage)

2021-07-06 Thread Israel Ekpo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375986#comment-17375986
 ] 

Israel Ekpo commented on KAFKA-12458:
-

This issue is not tied to any release since the code will be hosted outside of 
the Kafka Repo

> Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob 
> Storage)
> -
>
> Key: KAFKA-12458
> URL: https://issues.apache.org/jira/browse/KAFKA-12458
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>
> Task to cover integration support for Azure Storage
>  * Azure Blob Storage
>  * Azure Data Lake Store
> Will split task up later into distinct tracks and components



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ryannedolan opened a new pull request #10983: mention IdentityReplicationPolicy in ops docs

2021-07-06 Thread GitBox


ryannedolan opened a new pull request #10983:
URL: https://github.com/apache/kafka/pull/10983


   IdentityReplicationPolicy was added to MM2 in 3.0.0, so we note it in the 
docs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3

2021-07-06 Thread GitBox


rondagostino commented on a change in pull request #10918:
URL: https://github.com/apache/kafka/pull/10918#discussion_r664827028



##
File path: gradle/dependencies.gradle
##
@@ -61,6 +61,7 @@ versions += [
   bcpkix: "1.66",
   checkstyle: "8.36.2",
   commonsCli: "1.4",
+  dropwizardMetrics: "3.2.5",

Review comment:
   https://github.com/apache/kafka/pull/10982 contains this change.  Will 
attache system test results there when tests complete.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10982: MINOR: Update dropwizard metrics to 4.1.12.1

2021-07-06 Thread GitBox


rondagostino opened a new pull request #10982:
URL: https://github.com/apache/kafka/pull/10982


   ZooKeeper was updated to v3.6.3 via 
https://github.com/apache/kafka/pull/10918.  However, it was noted in that PR 
discussion (https://github.com/apache/kafka/pull/10918#discussion_r663412933) 
that the dropwizard metrics-core library has since been updated from v3.2.5 to 
v4.1.12.1 for 3.7.x releases (via 
https://github.com/apache/zookeeper/commit/13fe0d0ffb9fd2c379b9b430aaaf9ee75acfceba).
  Since there were no code changes associated with this library version bump in 
ZooKeeper, and since we wish to avoid potential CVEs if possible, we can 
consider updating to this newer version now assuming that system tests pass. I 
will attach system test results when they complete.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#issuecomment-875021173


   @rajinisivaram Thank you for the reviews, I have updated the PR addressing 
all your comments. Whenever you get a chance, the PR is ready for review - 
thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664819056



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   @rajinisivaram I'm not sure how we could test that case, as the 
`Builder` for the `OffsetFetchRequest` takes in a `Map>` which means all of the keys have to be unique. As a 
result if a group appears twice, the latest entry will take precedence. Not 
sure if this is how we want to handle it, but generally speaking I think we 
don't support having the same name group id appearing twice in the request - as 
in it's not possible to construct a request like that, at least not using the 
`Builder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664807523



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Ah okay, good point. Let's do this in a follow up PR, since we are 
trying to get this feature into 3.0 right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine opened a new pull request #10981: Bump trunk to 3.1.0-SNAPSHOT

2021-07-06 Thread GitBox


kkonstantine opened a new pull request #10981:
URL: https://github.com/apache/kafka/pull/10981


   Typical version bumps on `trunk` , after the creation of the `3.0` release 
branch. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664807523



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Ah okay, good point. Let's do this in a follow up PR, since we are 
trying to get this feature into 3.0 right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue removed a comment on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-06 Thread GitBox


kirktrue removed a comment on pull request #10980:
URL: https://github.com/apache/kafka/pull/10980#issuecomment-874998780


   Can someone from @confluentinc/clients review, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664806203



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Same test class `OffsetFetchRequestTest.scala`, a test with v8 with a 
batched request where the same group appears twice, perhaps with different 
topics. The response should be either InvalidRequestException because we want 
to treat it as an error OR actual offsets because we handle the request 
correctly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664805037



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());

Review comment:
   Ah gotcha, makes sense. For now I have removed the `toSingleton()` 
helper method and collected it into a list that gets the first element instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664804113



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());

Review comment:
   The helper method `toSingleton()` throws IllegalStateException if the 
list size is greater than one. If a request contains the same group twice, it 
can appear twice in the list.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664803616



##
File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
##
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+import java.util
+import java.util.Collections.singletonList
+import scala.jdk.CollectionConverters._
+import java.util.{Optional, Properties}
+
+class OffsetFetchRequestTest extends BaseRequestTest{
+
+  override def brokerCount: Int = 1
+
+  val brokerId: Integer = 0
+  val offset = 15L
+  val leaderEpoch: Optional[Integer] = Optional.of(3)
+  val metadata = "metadata"
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
+properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
+properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
+  }
+
+  @BeforeEach
+  override def setUp(): Unit = {
+doSetup(createOffsetsTopic = false)
+
+TestUtils.createOffsetsTopic(zkClient, servers)
+  }
+
+  @Test
+  def testOffsetFetchRequestLessThanV8(): Unit = {
+val topic = "topic"
+createTopic(topic)
+
+val groupId = "groupId"
+val tpList = singletonList(new TopicPartition(topic, 0))
+val topicOffsets = tpList.asScala.map{
+  tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata))
+}.toMap.asJava
+
+consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+val consumer = createConsumer()
+consumer.assign(tpList)
+consumer.commitSync(topicOffsets)
+consumer.close()
+// testing from version 1 onward since version 0 read offsets from ZK
+for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+  if (version < 8) {
+val request =
+  if (version < 7) {
+new OffsetFetchRequest.Builder(
+  groupId, false, tpList, false)
+  .build(version.asInstanceOf[Short])
+  } else {
+new OffsetFetchRequest.Builder(
+  groupId, false, tpList, true)
+  .build(version.asInstanceOf[Short])
+  }
+val response = connectAndReceive[OffsetFetchResponse](request)
+val topicData = response.data().topics().get(0)
+val partitionData = topicData.partitions().get(0)
+if (version < 3) {
+  assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs())
+}
+assertEquals(Errors.NONE, response.error())
+assertEquals(topic, topicData.name())
+assertEquals(0, partitionData.partitionIndex())
+assertEquals(offset, partitionData.committedOffset())
+if (version >= 5) {
+  // committed leader epoch introduced with V5
+  assertEquals(leaderEpoch.get(), partitionData.committedLeaderEpoch())
+}
+assertEquals(metadata, partitionData.metadata())
+assertEquals(Errors.NONE.code(), partitionData.errorCode())
+  }
+}
+  }
+
+  @Test
+  def testOffsetFetchRequestV8AndAbove(): Unit = {
+val groupOne = "group1"
+val groupTwo = "group2"
+val groupThree = "group3"
+val groupFour = "group4"
+val groupFive = "group5"

Review comment:
   I refactored this test class a bit to make use of more helper methods - 
is there something additional we can do here to make it work on a collection of 
groups? Are you suggesting adding an additional method that can create some 
number of groups that we can use 

[GitHub] [kafka] skaundinya15 commented on pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#issuecomment-875002777


   @kkonstantine This is the PR I am trying to get in time for 3.0, if you will 
allow for extensions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664800832



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Would this be a test for single group requests with v8? I have added 
that case to the existing single group tests for older versions in 
`OffsetFetchRequestTest.scala` and will push those changes shortly. Is there 
another type of test you were thinking of?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


skaundinya15 commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664798862



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -78,26 +85,107 @@ boolean isAllTopicPartitions() {
 return this.data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public Builder(Map> 
groupIdToTopicPartitionMap,
+boolean requireStable,
+boolean throwOnFetchStableOffsetsUnsupported) {
+super(ApiKeys.OFFSET_FETCH);
+
+List groups = new ArrayList<>();
+for (Entry> entry : 
groupIdToTopicPartitionMap.entrySet()) {
+String groupName = entry.getKey();
+List tpList = entry.getValue();
+final List topics;
+if (tpList != null) {
+Map 
offsetFetchRequestTopicMap =
+new HashMap<>();
+for (TopicPartition topicPartition : tpList) {
+String topicName = topicPartition.topic();
+OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
+topicName, new 
OffsetFetchRequestTopics().setName(topicName));
+
topic.partitionIndexes().add(topicPartition.partition());
+offsetFetchRequestTopicMap.put(topicName, topic);
+}
+topics = new 
ArrayList<>(offsetFetchRequestTopicMap.values());
+} else {
+topics = ALL_TOPIC_PARTITIONS_BATCH;
+}
+groups.add(new OffsetFetchRequestGroup()
+.setGroupId(groupName)
+.setTopics(topics));
+}
+this.data = new OffsetFetchRequestData()
+.setGroupIds(groups)
+.setRequireStable(requireStable);
+this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
+}
+
 @Override
 public OffsetFetchRequest build(short version) {
 if (isAllTopicPartitions() && version < 2) {
 throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
 "v" + version + ", but we need v2 or newer to request all 
topic partitions.");
 }
-
+if (data.groupIds().size() > 1 && version < 8) {
+throw new NoBatchedOffsetFetchRequestException("Broker does 
not support"
++ " batching groups for fetch offset request on version " 
+ version);
+}
 if (data.requireStable() && version < 7) {
 if (throwOnFetchStableOffsetsUnsupported) {
 throw new UnsupportedVersionException("Broker unexpectedly 
" +
 "doesn't support requireStable flag on version " + 
version);
 } else {
 log.trace("Fallback the requireStable flag to false as 
broker " +
-  "only supports OffsetFetchRequest version 
{}. Need " +
-  "v7 or newer to enable this feature", 
version);
+"only supports OffsetFetchRequest version {}. Need " +
+"v7 or newer to enable this feature", version);
 
 return new 
OffsetFetchRequest(data.setRequireStable(false), version);
 }
 }
-
+if (version < 8) {
+OffsetFetchRequestData oldDataFormat = null;
+if (!data.groupIds().isEmpty()) {
+OffsetFetchRequestGroup group = data.groupIds().get(0);
+String groupName = group.groupId();
+List topics = group.topics();
+List oldFormatTopics = null;
+if (topics != null) {
+oldFormatTopics = topics
+.stream()
+.map(t ->
+new OffsetFetchRequestTopic()
+.setName(t.name())
+.setPartitionIndexes(t.partitionIndexes()))
+.collect(Collectors.toList());
+}
+oldDataFormat = new OffsetFetchRequestData()
+.setGroupId(groupName)
+.setTopics(oldFormatTopics)
+.setRequireStable(data.requireStable());
+}
+return new OffsetFetchRequest(oldDataFormat == null ? data : 
oldDataFormat, version);
+}
+// version 8 but have used old format of request, convert to 
version 8 of request

Review comment:
   Ah okay, yes that makes sense, will make the change.




-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664798710



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -174,6 +315,28 @@ public boolean isAllPartitions() {
 return data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public boolean isAllPartitionsForGroup(String groupId) {
+OffsetFetchRequestGroup group = data
+.groupIds()
+.stream()
+.filter(g -> g.groupId().equals(groupId))
+.collect(toSingleton());
+return group.topics() == ALL_TOPIC_PARTITIONS_BATCH;
+}
+
+// Custom collector to filter a single element
+private  Collector toSingleton() {

Review comment:
   Can we a add a test in the new `kafka.server.OffsetFetchRequestTest`? 
Can do that in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-06 Thread GitBox


kirktrue commented on pull request #10980:
URL: https://github.com/apache/kafka/pull/10980#issuecomment-874998780


   Can someone from @confluentinc/clients review, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664796679



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() {
 
 @Override
 public void handle(OffsetFetchResponse response, 
RequestFuture> future) {
-if (response.hasError()) {
-Errors error = response.error();
-log.debug("Offset fetch failed: {}", error.message());
+Errors responseError = 
response.groupLevelError(rebalanceConfig.groupId);
+if (responseError != Errors.NONE) {
+log.debug("Offset fetch failed: {}", responseError.message());
 
-if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
 // just retry
-future.raise(error);
-} else if (error == Errors.NOT_COORDINATOR) {
+future.raise(responseError);
+} else if (responseError == Errors.NOT_COORDINATOR) {
 // re-discover the coordinator and retry
-markCoordinatorUnknown(error);
-future.raise(error);
-} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+markCoordinatorUnknown(responseError);
+future.raise(responseError);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) 
{
 
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
 } else {
-future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + error.message()));
+future.raise(new KafkaException("Unexpected error in fetch 
offset response: " + responseError.message()));
 }
 return;
 }
 
 Set unauthorizedTopics = null;
-Map offsets = new 
HashMap<>(response.responseData().size());
+Map 
responseData =

Review comment:
   ok, let's leave as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue closed pull request #10979: WIP

2021-07-06 Thread GitBox


kirktrue closed pull request #10979:
URL: https://github.com/apache/kafka/pull/10979


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664795285



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java
##
@@ -76,62 +73,169 @@ public void testConstructor() {
 }
 
 for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
-OffsetFetchRequest request = builder.build(version);
-assertFalse(request.isAllPartitions());
-assertEquals(groupId, request.groupId());
-assertEquals(partitions, request.partitions());
-
-OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
-assertEquals(Errors.NONE, response.error());
-assertFalse(response.hasError());
-assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
-"Incorrect error count for version " + version);
-
-if (version <= 1) {
-assertEquals(expectedData, response.responseData());
+if (version < 8) {
+builder = new OffsetFetchRequest.Builder(
+group1,
+false,
+partitions,
+false);
+assertFalse(builder.isAllTopicPartitions());
+OffsetFetchRequest request = builder.build(version);
+assertFalse(request.isAllPartitions());
+assertEquals(group1, request.groupId());
+assertEquals(partitions, request.partitions());
+
+OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+assertEquals(Errors.NONE, response.error());
+assertFalse(response.hasError());
+assertEquals(Collections.singletonMap(Errors.NONE, version <= 
(short) 1 ? 3 : 1), response.errorCounts(),
+"Incorrect error count for version " + version);
+
+if (version <= 1) {
+assertEquals(expectedData, response.responseDataV0ToV7());
+}
+
+if (version >= 3) {
+assertEquals(throttleTimeMs, response.throttleTimeMs());
+} else {
+assertEquals(DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs());
+}
+} else {
+builder = new Builder(Collections.singletonMap(group1, 
partitions), false, false);
+OffsetFetchRequest request = builder.build(version);
+Map> groupToPartitionMap =
+request.groupIdsToPartitions();
+Map> groupToTopicMap =
+request.groupIdsToTopics();
+assertFalse(request.isAllPartitionsForGroup(group1));
+assertTrue(groupToPartitionMap.containsKey(group1) && 
groupToTopicMap.containsKey(
+group1));
+assertEquals(partitions, groupToPartitionMap.get(group1));
+OffsetFetchResponse response = 
request.getErrorResponse(throttleTimeMs, Errors.NONE);
+assertEquals(Errors.NONE, response.groupLevelError(group1));
+assertFalse(response.groupHasError(group1));
+assertEquals(Collections.singletonMap(Errors.NONE, 1), 
response.errorCounts(),
+"Incorrect error count for version " + version);
+assertEquals(throttleTimeMs, response.throttleTimeMs());
 }
+}
+}
+
+@Test
+public void testConstructorWithMultipleGroups() {
+List topic1Partitions = Arrays.asList(
+new TopicPartition(topicOne, partitionOne),
+new TopicPartition(topicOne, partitionTwo));
+List topic2Partitions = Arrays.asList(
+new TopicPartition(topicTwo, partitionOne),
+new TopicPartition(topicTwo, partitionTwo));
+List topic3Partitions = Arrays.asList(
+new TopicPartition(topicThree, partitionOne),
+new TopicPartition(topicThree, partitionTwo));
+Map> groupToTp = new HashMap<>();
+groupToTp.put(group1, topic1Partitions);
+groupToTp.put(group2, topic2Partitions);
+groupToTp.put(group3, topic3Partitions);
+groupToTp.put(group4, null);
+groupToTp.put(group5, null);
+int throttleTimeMs = 10;
 
-if (version >= 3) {
+for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
+if (version >= 8) {
+builder = new Builder(groupToTp, false, false);
+OffsetFetchRequest request = builder.build(version);
+Map> groupToPartitionMap =
+request.groupIdsToPartitions();
+Map> groupToTopicMap =
+request.groupIdsToTopics();
+assertEquals(groupToTp.keySet(), 

[GitHub] [kafka] kirktrue opened a new pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-06 Thread GitBox


kirktrue opened a new pull request #10980:
URL: https://github.com/apache/kafka/pull/10980


   Handle the case where `matches` returns `false` and throw the 
`InvalidStateException` as stated by the JavaDoc.
   
   We need to guard against this unexpected runtime error in the 
`KafkaAdminClient`'s `sendEligibleCalls` method with a try/catch. Not 100% sure 
if that's kosher or not.
   
   Included a targeted unit test for this case. The remaining tests in 
`KafkaAdminTestClient` continue to pass.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching

2021-07-06 Thread GitBox


rajinisivaram commented on a change in pull request #10962:
URL: https://github.com/apache/kafka/pull/10962#discussion_r664793097



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
##
@@ -78,26 +85,107 @@ boolean isAllTopicPartitions() {
 return this.data.topics() == ALL_TOPIC_PARTITIONS;
 }
 
+public Builder(Map> 
groupIdToTopicPartitionMap,
+boolean requireStable,
+boolean throwOnFetchStableOffsetsUnsupported) {
+super(ApiKeys.OFFSET_FETCH);
+
+List groups = new ArrayList<>();
+for (Entry> entry : 
groupIdToTopicPartitionMap.entrySet()) {
+String groupName = entry.getKey();
+List tpList = entry.getValue();
+final List topics;
+if (tpList != null) {
+Map 
offsetFetchRequestTopicMap =
+new HashMap<>();
+for (TopicPartition topicPartition : tpList) {
+String topicName = topicPartition.topic();
+OffsetFetchRequestTopics topic = 
offsetFetchRequestTopicMap.getOrDefault(
+topicName, new 
OffsetFetchRequestTopics().setName(topicName));
+
topic.partitionIndexes().add(topicPartition.partition());
+offsetFetchRequestTopicMap.put(topicName, topic);
+}
+topics = new 
ArrayList<>(offsetFetchRequestTopicMap.values());
+} else {
+topics = ALL_TOPIC_PARTITIONS_BATCH;
+}
+groups.add(new OffsetFetchRequestGroup()
+.setGroupId(groupName)
+.setTopics(topics));
+}
+this.data = new OffsetFetchRequestData()
+.setGroupIds(groups)
+.setRequireStable(requireStable);
+this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
+}
+
 @Override
 public OffsetFetchRequest build(short version) {
 if (isAllTopicPartitions() && version < 2) {
 throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
 "v" + version + ", but we need v2 or newer to request all 
topic partitions.");
 }
-
+if (data.groupIds().size() > 1 && version < 8) {
+throw new NoBatchedOffsetFetchRequestException("Broker does 
not support"
++ " batching groups for fetch offset request on version " 
+ version);
+}
 if (data.requireStable() && version < 7) {
 if (throwOnFetchStableOffsetsUnsupported) {
 throw new UnsupportedVersionException("Broker unexpectedly 
" +
 "doesn't support requireStable flag on version " + 
version);
 } else {
 log.trace("Fallback the requireStable flag to false as 
broker " +
-  "only supports OffsetFetchRequest version 
{}. Need " +
-  "v7 or newer to enable this feature", 
version);
+"only supports OffsetFetchRequest version {}. Need " +
+"v7 or newer to enable this feature", version);
 
 return new 
OffsetFetchRequest(data.setRequireStable(false), version);
 }
 }
-
+if (version < 8) {
+OffsetFetchRequestData oldDataFormat = null;
+if (!data.groupIds().isEmpty()) {
+OffsetFetchRequestGroup group = data.groupIds().get(0);
+String groupName = group.groupId();
+List topics = group.topics();
+List oldFormatTopics = null;
+if (topics != null) {
+oldFormatTopics = topics
+.stream()
+.map(t ->
+new OffsetFetchRequestTopic()
+.setName(t.name())
+.setPartitionIndexes(t.partitionIndexes()))
+.collect(Collectors.toList());
+}
+oldDataFormat = new OffsetFetchRequestData()
+.setGroupId(groupName)
+.setTopics(oldFormatTopics)
+.setRequireStable(data.requireStable());
+}
+return new OffsetFetchRequest(oldDataFormat == null ? data : 
oldDataFormat, version);
+}
+// version 8 but have used old format of request, convert to 
version 8 of request

Review comment:
   At the moment, the code seems to do:
   ```
   if (version < 8) {
do-conversion-if-necessary
   }
   // 

[jira] [Assigned] (KAFKA-12989) MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse

2021-07-06 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-12989:
-

Assignee: Kirk True

> MockClient should respect the request matcher passed to 
> prepareUnsupportedVersionResponse
> -
>
> Key: KAFKA-12989
> URL: https://issues.apache.org/jira/browse/KAFKA-12989
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: Kirk True
>Priority: Major
>
> The {{MockClient}} offers the possibility to prepare an unsupported version 
> response with {{MockClient#prepareUnsupportedVersionResponse}}. That method 
> accepts a {{RequestMatcher}} but it is never applied. It should be or we 
> should remove the matcher from the method.
> {code:java}
> UnsupportedVersionException unsupportedVersionException = null;
> if (futureResp.isUnsupportedRequest) {
> unsupportedVersionException = new UnsupportedVersionException(
> "Api " + request.apiKey() + " with version " + 
> version);
> } else {
> AbstractRequest abstractRequest = 
> request.requestBuilder().build(version);
> if (!futureResp.requestMatcher.matches(abstractRequest))
> throw new IllegalStateException("Request matcher did not 
> match next-in-line request "
> + abstractRequest + " with prepared response " + 
> futureResp.responseBody);
> }
> ClientResponse resp = new 
> ClientResponse(request.makeHeader(version), request.callback(), 
> request.destination(),
> request.createdTimeMs(), time.milliseconds(), 
> futureResp.disconnected,
> unsupportedVersionException, null, 
> futureResp.responseBody);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13035) Kafka Connect: Update documentation for POST /connectors/(string: name)/restart to include task Restart behavior

2021-07-06 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-13035.
---
Fix Version/s: 3.0.0
 Reviewer: Randall Hauch
   Resolution: Fixed

Merged to `trunk` in time for 3.0.0

> Kafka Connect: Update documentation for POST /connectors/(string: 
> name)/restart to include task Restart behavior  
> --
>
> Key: KAFKA-13035
> URL: https://issues.apache.org/jira/browse/KAFKA-13035
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
> Fix For: 3.0.0
>
>
> KAFKA-4793 updated the behavior of POST /connectors/(string: name)/restart 
> based on queryParameters onlyFailed and includeTasks  based on 
> [KIP-475|https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks].
>  We should update documentation to reflect this



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch merged pull request #10975: KAFKA-13035 updated documentation for connector restart REST API to …

2021-07-06 Thread GitBox


rhauch merged pull request #10975:
URL: https://github.com/apache/kafka/pull/10975


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >