[jira] [Created] (KAFKA-16153) kraft_upgrade_test system test is broken

2024-01-17 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16153:
--

 Summary: kraft_upgrade_test system test is broken
 Key: KAFKA-16153
 URL: https://issues.apache.org/jira/browse/KAFKA-16153
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Reporter: Mickael Maison


I get the following failure from all `from_kafka_version` versions:


Command '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
ducker05:9092,ducker06:9092,ducker07:9092 upgrade --metadata 3.8' returned 
non-zero exit status 1. Remote error message: b'SLF4J: Class path contains 
multiple SLF4J bindings.\nSLF4J: Found binding in 
[jar:file:/opt/kafka-dev/tools/build/dependant-libs-2.13.12/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
 Found binding in 
[jar:file:/opt/kafka-dev/trogdor/build/dependant-libs-2.13.12/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
 See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.\nSLF4J: Actual binding is of type 
[org.slf4j.impl.Reload4jLoggerFactory]\nUnsupported metadata version 3.8. 
Supported metadata versions are 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 
3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 
3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0\n'



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]

2024-01-17 Thread via GitHub


mimaison commented on code in PR #15159:
URL: https://github.com/apache/kafka/pull/15159#discussion_r1454953584


##
checkstyle/import-control-core.xml:
##
@@ -94,6 +94,7 @@
   
 
 
+

Review Comment:
   I think we can remove that line too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]

2024-01-17 Thread via GitHub


satishd closed pull request #15209: MINOR Removed unused CommittedOffsetsFile 
class.
URL: https://github.com/apache/kafka/pull/15209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


AndrewJSchofield commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455084554


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
 assertPoll(1, commitRequestManger);
 }
 
+@Test

Review Comment:
   I think they should. Looks to me like the legacy consumer called the 
interceptors when closing the consumer coordinator 
(`ConsumerCoordinator.maybeAutoCommitOffsetsSync`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-4759) Add support for subnet masks in SimpleACLAuthorizer

2024-01-17 Thread Szymon Scharmach (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807680#comment-17807680
 ] 

Szymon Scharmach commented on KAFKA-4759:
-

Updated PR is ready: [https://github.com/apache/kafka/pull/9937]

Some tests are failing but it seems that failures are related to other issues 
present on trunk as well.

> Add support for subnet masks in SimpleACLAuthorizer
> ---
>
> Key: KAFKA-4759
> URL: https://issues.apache.org/jira/browse/KAFKA-4759
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Shun Takebayashi
>Assignee: Shun Takebayashi
>Priority: Major
>
> SimpleACLAuthorizer currently accepts only single IP addresses.
> Supporting subnet masks with SimpleACLAuthorizer can make ACL configurations 
> simpler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-17 Thread via GitHub


DL1231 opened a new pull request, #15211:
URL: https://github.com/apache/kafka/pull/15211

   While using —list —state the current accepted values correspond to the 
classic group type states. 
   This PR include support for the new group type states.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-17 Thread via GitHub


DL1231 commented on PR #15211:
URL: https://github.com/apache/kafka/pull/15211#issuecomment-1895572781

   @rreddy-22 PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16147) Partition is assigned to two members at the same time

2024-01-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-16147:
---

Assignee: David Jacot

> Partition is assigned to two members at the same time
> -
>
> Key: KAFKA-16147
> URL: https://issues.apache.org/jira/browse/KAFKA-16147
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Emanuele Sabellico
>Assignee: David Jacot
>Priority: Major
> Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, 
> server.properties, server1.properties, server2.properties
>
>
> While running [test 0113 of 
> librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384],
>  subtest _u_multiple_subscription_changes_ have received this error saying 
> that a partition is assigned to two members at the same time.
> {code:java}
> Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] 
> which is already assigned to consumer C_5#consumer-8 {code}
> I've reconstructed this sequence:
> C_5 SUBSCRIBES TO T1
> {noformat}
> %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id 
> "(null)", current assignment "", subscribe topics 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat}
> C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12
> {noformat}
> [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, 
> targetMemberEpoch=6, state=assigning, assignedPartitions={}, 
> partitionsPendingRevocation={}, 
> partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
> (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat}
>  
> C_5 RECEIVES TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 ACKS TARGET ASSIGNMENT
> {noformat}
> %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment 
> "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", 
> subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat response received target assignment 
> "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], 
> (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat}
>  
> C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are 
> pending 
> {noformat}
> %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: 
> Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id 
> "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id 
> "NULL", current assignment "NULL", subscribe topics 
> "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"
> [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated 
> its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, 
> rdkafkatest_rnd5a91902462d61c2e_0113u_1]. 
> (org.apache.kafka.coordinator.group.GroupMetadataManager)
> [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 
> topic=__consumer_offsets partition=7] [GroupId 
> rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw 
> transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=14, state=stable, 
> assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, 
> partitionsPendingRevocation={}, partitionsPendingAssignment={}) to 
> CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, 
> targetMemberEpoch=16, state=revoking, assignedPartitions={}, 
> partitionsPendingRevocation={IKXGrFR1

[PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-17 Thread via GitHub


dajac opened a new pull request, #15212:
URL: https://github.com/apache/kafka/pull/15212

   We had a case where a partition got assigned to two members and we found a 
bug in the partition epochs bookkeeping. Basically, when a member has a 
partition pending revocation re-assigned to him before the revocation is 
completed, the partition epoch is lost. Here is an example of such transition:
   
   ```
   [2024-01-16 12:10:52,613] INFO [GroupCoordinator id=1 
topic=__consumer_offsets partition=7] [GroupId 
rdkafkatest_rnd53b4eb0c2de343_0113u] Member M2 transitioned from 
CurrentAssignment(memberEpoch=11, previousMemberEpoch=9, targetMemberEpoch=14, 
state=revoking, assignedPartitions={}, 
partitionsPendingRevocation={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, 
partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[0, 5]}) to 
CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, targetMemberEpoch=15, 
state=stable, assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 
7]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). 
(org.apache.kafka.coordinator.group.GroupMetadataManager)
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-01-17 Thread via GitHub


DL1231 commented on PR #15067:
URL: https://github.com/apache/kafka/pull/15067#issuecomment-1895664514

   @dajac, PTAL, thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16154) Make broker changes to return an offset for LATEST_TIERED_TIMESTAMP

2024-01-17 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-16154:
-

 Summary: Make broker changes to return an offset for 
LATEST_TIERED_TIMESTAMP
 Key: KAFKA-16154
 URL: https://issues.apache.org/jira/browse/KAFKA-16154
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov
Assignee: Christo Lolov
 Fix For: 3.8.0


A broker should start returning offsets when given a timestamp of -5, which 
signifies a LATEST_TIERED_TIMESTAMP.

There are 3 cases.

Tiered Storage is not enabled. In such a situation asking for 
LATEST_TIERED_TIMESTAMP should always return no offset.

Tiered Storage is enabled and there is nothing in remote storage. In such a 
situation the offset returned should be 0.

Tiered Storage is enabled and there is something in remote storage. In such a 
situation the offset returned should be the highest offset the broker is aware 
of.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-17 Thread via GitHub


clolov opened a new pull request, #15213:
URL: https://github.com/apache/kafka/pull/15213

   ### Summary
   
   This is the first part of the implementation of 
[KIP-1005](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1005%3A+Expose+EarliestLocalOffset+and+TieredOffset)
   
   The purpose of this pull request is for the broker to start returning the 
correct offset when it receives a -5 as a timestamp in a ListOffsets API request


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-17 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1455525955


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-

Review Comment:
   I didn't really see a point in this check 
`earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset`. As 
far as I can tell the `cache.epochForOffset` already carries it out. Let me 
know in case I have misunderstood something.



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -2126,6 +2126,94 @@ class UnifiedLogTest {
   log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(remoteLogManager)))
   }
 
+  @Test
+  def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+val log = createLog(logDir, logConfig)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+
+val firstTimestamp = mockTime.milliseconds
+val leaderEpoch = 0
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+val secondTimestamp = firstTimestamp + 1
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = secondTimestamp),
+  leaderEpoch = leaderEpoch)
+
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+  }
+
+  @Test
+  def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {

Review Comment:
   This test could be combined with 
`testFetchOffsetByTimestampFromRemoteStorage` as the only difference it has are 
lines 2167, 2193, 2203 and 2204. Let me know your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16155) Investigate testAutoCommitIntercept

2024-01-17 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16155:
--

 Summary: Investigate testAutoCommitIntercept
 Key: KAFKA-16155
 URL: https://issues.apache.org/jira/browse/KAFKA-16155
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Lucas Brutschy


Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
flakes on the the initial setup (before using interceptors, so interceptors are 
unrelated here, except for being used later in the test).

The problem is that we are seeking two topic partitions to offset 10 and 20, 
respectively, but when we commit, we seem to have lost one of the offsets, 
likely due to a race condition. 

When I output `subscriptionState.allConsumed` repeatedly, I get this output:

 
{code:java}
allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
metadata=''}}
seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: 
null)], epoch=0}}
seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: 
null)], epoch=0}}
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, 
metadata=''}} 
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, 
leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, 
leaderEpoch=null, metadata=''}} autocommit start 
{topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, 
topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
{code}
So we after we seek to 10 / 20, we lose one of the offsets, maybe because we 
haven't reconciled the assignment yet. Later, we get the second topic partition 
assigned, but the offset is initialized to 0.

We should investigate whether this can be made more like the behavior in the 
original consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16155) Investigate testAutoCommitIntercept

2024-01-17 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16155:
---
Description: 
Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
flakes on the the initial setup (before using interceptors, so interceptors are 
unrelated here, except for being used later in the test).

The problem is that we are seeking two topic partitions to offset 10 and 20, 
respectively, but when we commit, we seem to have lost one of the offsets, 
likely due to a race condition. 

When I output `subscriptionState.allConsumed` repeatedly, I get this output:
{noformat}
allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
metadata=''}} 
seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: 
null)], epoch=0}} 
seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: 
null)], epoch=0}} 
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, 
metadata=''}} 
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}} 
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
metadata=''}}
autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
metadata=''}}

{noformat}
So we after we seek to 10 / 20, we lose one of the offsets, maybe because we 
haven't reconciled the assignment yet. Later, we get the second topic partition 
assigned, but the offset is initialized to 0.

We should investigate whether this can be made more like the behavior in the 
original consumer.

  was:
Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
flakes on the the initial setup (before using interceptors, so interceptors are 
unrelated here, except for being used later in the test).

The problem is that we are seeking two topic partitions to offset 10 and 20, 
respectively, but when we commit, we seem to have lost one of the offsets, 
likely due to a race condition. 

When I output `subscriptionState.allConsumed` repeatedly, I get this output:

 
{code:java}
allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
metadata=''}}
seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: 
null)], epoch=0}}
seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: 
null)], epoch=0}}
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, 
metadata=''}} 
allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
metadata=''}} allConsumed: {topic-0=OffsetAndMetadata{offset=10, 
leaderEpoch=null, metadata=''}, topic-1=OffsetAndMetadata{offset=0, 
leaderEpoch=null, metadata=''}} autocommit start 
{topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, metadata=''}, 
topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
{code}
So we after we seek to 10 / 20, we lose one of the offsets, maybe because we 
haven't reconciled the assignment yet. Later, we get the second topic partition 
assigned, but the offset is initialized to 0.

We should investigate whether this can be made more like the behavior in the 
original consumer.


> Investigate testAutoCommitIntercept
> ---
>
> Key: KAFKA-16155
> URL: https://issues.apache.org/jira/browse/KAFKA-16155
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
> flakes on the the initial setup (before using interceptors, so interceptors 
> are unrelated here, except for being used later in the test).
> The problem is that we are seeking two topic partitions to offset 10 and 20, 
> respectively, but when we commit, we seem to have lost one of the offsets, 
> likely due to a race condition. 
> When I output `subscriptionState.allConsumed` repeatedly, I get this output:
> {noformat}
> allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}} 
> seeking to

Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]

2024-01-17 Thread via GitHub


OmniaGM commented on code in PR #15159:
URL: https://github.com/apache/kafka/pull/15159#discussion_r1455645909


##
checkstyle/import-control-core.xml:
##
@@ -94,6 +94,7 @@
   
 
 
+

Review Comment:
   remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455660640


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
 assertPoll(1, commitRequestManger);
 }
 
+@Test

Review Comment:
   Yes, they should. This is tested in the integration test 
`PlaintextConsumer.testAutoCommitIntercept` - which works in terms of 
interceptors, but I have to keep disabled in this PR because of KAFKA-16155. 
This PR does call the interceptors after closing the network thread (I pinged 
you about it above). I can add a little unit test to `AsyncKafkaConsumerTest`. 
I don't think we can add a unit test for it in `CommitRequestManagerTest`, 
because the autocommit on close is triggered from the application thread, and 
the interceptors are run from the application thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455670090


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -93,9 +94,11 @@ public CommitRequestManager(
 final SubscriptionState subscriptions,
 final ConsumerConfig config,
 final CoordinatorRequestManager coordinatorRequestManager,
+final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,

Review Comment:
   This is very mixed across the clients codebase. Sometimes you put the final 
sometimes you don't. In the streams module there is a strict rule to do. Not 
sure, but as long as there is no guideline around this, and we are not 
completely repulsed by it, I'd suggest to just stick with whatever the existing 
code is doing for consistency to not mess up git blame too much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455660640


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
 assertPoll(1, commitRequestManger);
 }
 
+@Test

Review Comment:
   Yes, they should. This is tested in the integration test 
`PlaintextConsumer.testAutoCommitIntercept` - which works in terms of 
interceptors, but I have to keep disabled in this PR because of KAFKA-16155. 
This PR does call the interceptors after closing the network thread (I pinged 
you about it above). I can add a little unit test to `AsyncKafkaConsumerTest`. 
I don't think we can add a unit test for it in `CommitRequestManagerTest`, 
because the autocommit on close is triggered from the application thread, and 
the interceptors are run from the application thread, so in this class it does 
look very much like any normal commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455660640


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
 assertPoll(1, commitRequestManger);
 }
 
+@Test

Review Comment:
   Yes, they should. This is tested in the integration test 
`PlaintextConsumer.testAutoCommitIntercept` - which works in terms of 
interceptors, but I have to keep disabled in this PR because of KAFKA-16155. 
This PR does call the interceptors after closing the network thread (I pinged 
you about it above). I can add a little unit test to `AsyncKafkaConsumerTest`. 
I don't think we can add a unit test for it in `CommitRequestManagerTest`, 
because the autocommit on close is triggered from the application thread, so in 
this class it does look very much like any normal commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-17 Thread via GitHub


pprovenzano commented on code in PR #15197:
URL: https://github.com/apache/kafka/pull/15197#discussion_r1455684322


##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -46,6 +46,18 @@ public final class FeaturesImage {
 ZkMigrationState.NONE
 );
 
+public static final FeaturesImage LATEST = new FeaturesImage(

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16097: Disable state updater in trunk (#15146) [kafka]

2024-01-17 Thread via GitHub


lucasbru merged PR #15204:
URL: https://github.com/apache/kafka/pull/15204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16156) System test failing on endOffsets with negative timestamps

2024-01-17 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16156:
--

 Summary: System test failing on endOffsets with negative timestamps
 Key: KAFKA-16156
 URL: https://issues.apache.org/jira/browse/KAFKA-16156
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Reporter: Lianet Magrans


TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
negative timestamp".
Trace:
[2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
loop (org.apache.kafka.tools.TransactionalMessageCopier)
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Invalid negative timestamp
at 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
at 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
at 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
at 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
at 
org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
at 
org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
at 
org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16156) System test failing on endOffsets with negative timestamps

2024-01-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16156:
---
Description: 
TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
negative timestamp".

Trace:
[2024-01-15 07:42:33,932] TRACE [Consumer 
clientId=consumer-transactions-test-consumer-group-1, 
groupId=transactions-test-consumer-group] Received ListOffsetResponse 
ListOffsetsResponseData(throttleTimeMs=0, 
topics=[ListOffsetsTopicResponse(name='input-topic', 
partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from broker 
worker2:9092 (id: 2 rack: null) 
(org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
[2024-01-15 07:42:33,932] DEBUG [Consumer 
clientId=consumer-transactions-test-consumer-group-1, 
groupId=transactions-test-consumer-group] Handling ListOffsetResponse response 
for input-topic-0. Fetched offset 42804, timestamp -1 
(org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
[2024-01-15 07:42:33,932] TRACE [Consumer 
clientId=consumer-transactions-test-consumer-group-1, 
groupId=transactions-test-consumer-group] Updating last stable offset for 
partition input-topic-0 to 42804 
(org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
[2024-01-15 07:42:33,933] DEBUG [Consumer 
clientId=consumer-transactions-test-consumer-group-1, 
groupId=transactions-test-consumer-group] Fetch offsets completed successfully 
for partitions and timestamps {input-topic-0=-1}. Result 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
 (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
[2024-01-15 07:42:33,933] TRACE [Consumer 
clientId=consumer-transactions-test-consumer-group-1, 
groupId=transactions-test-consumer-group] No events to process 
(org.apache.kafka.clients.consumer.internals.events.EventProcessor)
[2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
loop (org.apache.kafka.tools.TransactionalMessageCopier)
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Invalid negative timestamp
at 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
at 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
at 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
at 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
at 
org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
at 
org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
at 
org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
at 
org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
at 
org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetsRequests$3(OffsetsRequestManager.java:305)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.kafka.clients.consumer.internals.OffsetsRequestManager$MultiNodeRequest.addPartialResult(OffsetsRequestMana

[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16156:
---
Summary: System test failing for new consumer on endOffsets with negative 
timestamps  (was: System test failing on endOffsets with negative timestamps)

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Lianet Magrans
>Priority: Major
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.complete(Comple

Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455787900


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -844,6 +849,54 @@ public void testWakeupCommitted() {
 assertNull(consumer.wakeupTrigger().getPendingTask());
 }
 
+@Test

Review Comment:
   Added a test. Interceptors are called.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Utility class that helps the application thread to invoke user registered 
{@link OffsetCommitCallback} amd
+ * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}s. This is
+ * achieved by having the background thread register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+ * future completion, and execute the callbacks when user polls/commits/closes 
the consumer.
+ */
+public class OffsetCommitCallbackInvoker {
+private final ConsumerInterceptors interceptors;
+
+OffsetCommitCallbackInvoker(ConsumerInterceptors interceptors) {
+this.interceptors = interceptors;
+}
+
+// Thread-safe queue to store user-defined callbacks and interceptors to 
be executed
+private final BlockingQueue callbackQueue = new 
LinkedBlockingQueue<>();
+
+public void submitCommitInterceptors(final Map offsets) {
+if (!interceptors.isEmpty()) {
+callbackQueue.add(new OffsetCommitCallbackTask(
+(innerOffsets, exception) -> 
interceptors.onCommit(innerOffsets),
+offsets,
+null
+));
+}
+}
+
+public void submitUserCallback(final OffsetCommitCallback callback,
+   final Map offsets,
+   final Exception exception) {
+callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, 
exception));
+}
+
+/**
+ * @return true if an offset commit was fenced.
+ */
+public boolean executeCallbacks() {

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -93,9 +94,11 @@ public CommitRequestManager(
 final SubscriptionState subscriptions,
 final ConsumerConfig config,
 final CoordinatorRequestManager coordinatorRequestManager,
+final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
 final String groupId,
 final Optional groupInstanceId) {
-this(time, logContext, subscriptions, config, 
coordinatorRequestManager, groupId,
+this(time, logContext, subscriptions, config, 
coordinatorRequestManager,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1455790243


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1902,65 +1912,14 @@ private void maybeThrowFencedInstanceException() {
 }
 
 private void maybeInvokeCommitCallbacks() {
-if (callbacks() > 0) {
-invoker.executeCallbacks();
+if (offsetCommitCallbackInvoker.executeCallbacks()) {

Review Comment:
   Good point, done. I could actually simplify the code by moving `isFenced` 
inside the invoker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807767#comment-17807767
 ] 

Lianet Magrans commented on KAFKA-16134:


Might be related

> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> --
>
> Key: KAFKA-16134
> URL: https://issues.apache.org/jira/browse/KAFKA-16134
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following test is very flaky. It failed 3 times consecutively in Jenkins 
> runs for the 3.7 release candidate.
> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807767#comment-17807767
 ] 

Lianet Magrans edited comment on KAFKA-16134 at 1/17/24 3:21 PM:
-

Might be related/due to https://issues.apache.org/jira/browse/KAFKA-16107


was (Author: JIRAUSER300183):
Might be related

> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> --
>
> Key: KAFKA-16134
> URL: https://issues.apache.org/jira/browse/KAFKA-16134
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following test is very flaky. It failed 3 times consecutively in Jenkins 
> runs for the 3.7 release candidate.
> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe

2024-01-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807772#comment-17807772
 ] 

Lianet Magrans commented on KAFKA-16150:


[~kirktrue] this seems the same as 
https://issues.apache.org/jira/browse/KAFKA-16134 reported as part of the 3.7 
RC validation.

> Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
> 
>
> Key: KAFKA-16150
> URL: https://issues.apache.org/jira/browse/KAFKA-16150
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe

2024-01-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807773#comment-17807773
 ] 

Lianet Magrans commented on KAFKA-16151:


[~kirktrue] isn't this the same as 
https://issues.apache.org/jira/browse/KAFKA-16135?

> Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
> -
>
> Key: KAFKA-16151
> URL: https://issues.apache.org/jira/browse/KAFKA-16151
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16137: Add missing RPC field descriptions [kafka]

2024-01-17 Thread via GitHub


AndrewJSchofield opened a new pull request, #15214:
URL: https://github.com/apache/kafka/pull/15214

   The `ListClientMetricsResourcesResponse` definition is missing several 
`"about"` descriptions. The main effect of this is that the Kafka protocol 
documentation misses the descriptions of these fields which are blank.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15538) Client support for java regex based subscription

2024-01-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15538:
---
Description: 
When using subscribe with a java regex (Pattern), we need to resolve it on the 
client side to send the broker a list of topic names to subscribe to.

Context:

The new consumer group protocol uses [Google 
RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
new methods in the consumer API to subscribe using a `SubscribePattern`. The 
subscribe using a java `Pattern` will be still supported for a while but 
eventually removed.
 * When the subscribe with SubscriptionPattern is used, the client should just 
send the regex to the broker and it will be resolved on the server side.
 * In the case of the subscribe with Pattern, the regex should be resolved on 
the client side.

As part of this task, we should re-enable all integration tests defined in the 
PlainTextAsyncConsumer that relate to subscription with pattern and that are 
currently disabled for the new consumer + new protocol

  was:
When using subscribe with a java regex (Pattern), we need to resolve it on the 
client side to send the broker a list of topic names to subscribe to.

Context:

The new consumer group protocol uses [Google 
RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
new methods in the consumer API to subscribe using a `SubscribePattern`. The 
subscribe using a java `Pattern` will be still supported for a while but 
eventually removed.
 * When the subscribe with SubscriptionPattern is used, the client should just 
send the regex to the broker and it will be resolved on the server side.
 * In the case of the subscribe with Pattern, the regex should be resolved on 
the client side.


> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-01-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807783#comment-17807783
 ] 

Lianet Magrans commented on KAFKA-15538:


Hey [~phuctran], you're right that the intention is that this section 
[here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537]
 where the regex is sent in the HB request, should apply only to the 
SubscriptionPattern, but that's not what the current implementation achieves. 
With the current code, it would apply the section also to the Pattern, because 
the check is done based on #hasPatternSubscription (the moment we call 
[subscribe(Pattern..)|https://github.com/apache/kafka/blob/26465c64092868c972e2a0e4d9a4fc0ed13a7a39/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1719]
 the subscription state will have subscription type AUTO_PATTERN so 
#hasPatternSubscription will be true). 

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer [kafka]

2024-01-17 Thread via GitHub


kirktrue closed pull request #15206: KAFKA-16104: Enable additional 
PlaintextConsumerTest tests for new consumer
URL: https://github.com/apache/kafka/pull/15206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer [kafka]

2024-01-17 Thread via GitHub


kirktrue commented on PR #15206:
URL: https://github.com/apache/kafka/pull/15206#issuecomment-1896159822

   Closing and reopening to kick off another test run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-17 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16157:
-

 Summary: Topic recreation with offline disk doesn't update 
leadership/shrink ISR correctly
 Key: KAFKA-16157
 URL: https://issues.apache.org/jira/browse/KAFKA-16157
 Project: Kafka
  Issue Type: Bug
  Components: jbod, kraft
Affects Versions: 3.7.1
Reporter: Gaurav Narula


In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` in 
each broker, we perform the following operations:

 
 # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the topic 
was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
 # Start a producer in the background to produce to `foo.test`.
 # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
read-only.
 # Delete topic `foo.test`
 # Recreate topic `foo.test`. Let's assume the topic was created with id 
`bgdrsv-1QjCLFEqLOzVCHg`.
 # Wait for 5 minutes
 # Describe the recreated topic `foo.test`.

 

We observe that `broker-1` is the leader and in-sync for few partitions

 

 
{code:java}
 
Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
ReplicationFactor: 4    Configs: 
min.insync.replicas=1,unclean.leader.election.enable=false
        Topic: foo.test Partition: 0    Leader: 101     Replicas: 
101,102,103,104       Isr: 101,102,103,104
        Topic: foo.test Partition: 1    Leader: 102     Replicas: 
102,103,104,101       Isr: 102,103,104
        Topic: foo.test Partition: 2    Leader: 103     Replicas: 
103,104,101,102       Isr: 103,104,102
        Topic: foo.test Partition: 3    Leader: 104     Replicas: 
104,101,102,103       Isr: 104,102,103
        Topic: foo.test Partition: 4    Leader: 104     Replicas: 
104,102,101,103       Isr: 104,102,103
        Topic: foo.test Partition: 5    Leader: 102     Replicas: 
102,101,103,104       Isr: 102,103,104
        Topic: foo.test Partition: 6    Leader: 101     Replicas: 
101,103,104,102       Isr: 101,103,104,102
        Topic: foo.test Partition: 7    Leader: 103     Replicas: 
103,104,102,101       Isr: 103,104,102
        Topic: foo.test Partition: 8    Leader: 101     Replicas: 
101,102,104,103       Isr: 101,102,104,103
        Topic: foo.test Partition: 9    Leader: 102     Replicas: 
102,104,103,101       Isr: 102,104,103
{code}
 

 

In this example, it is the leader of partitions `0, 6 and 8`.

 

Consider `foo.test-8`. It is present in the following brokers/disks:

 

 
{code:java}
$ fd foo.test-8
broker-1/d1/foo.test-8/
broker-2/d2/foo.test-8/
broker-3/d2/foo.test-8/
broker-4/d1/foo.test-8/{code}
 

 

`broker-1/d1` still refers to the topic id which is pending deletion because 
the log dir is marked offline.

 

 
{code:java}
$ cat broker-1/d1/foo.test-8/partition.metadata
version: 0
topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
 

 

However, other brokers have the correct topic-id

 

 
{code:java}
$ cat broker-2/d2/foo.test-8/partition.metadata
version: 0
topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
 

 

Now, let's consider `foo.test-0`. We observe that the replica isn't present in 
`broker-1`:




{code:java}
$ fd foo.test-0
broker-2/d1/foo.test-0/
broker-3/d1/foo.test-0/
broker-4/d2/foo.test-0/{code}



In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
partitions.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-17 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-16157:
--
Attachment: broker.log
broker.log.1
broker.log.2
broker.log.3
broker.log.4
broker.log.5
broker.log.6
broker.log.7
broker.log.8
broker.log.9
broker.log.10

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.1
>Reporter: Gaurav Narula
>Priority: Major
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


mimaison commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456011084


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/PartitionAssignmentState.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import org.apache.kafka.common.Node;
+
+import java.util.Optional;
+
+class PartitionAssignmentState {
+public final String group;
+public final Optional coordinator;
+public final Optional topic;
+public final Optional partition;
+public final Optional offset;
+public final Optional lag;
+public final Optional consumerId;
+public final Optional host;
+public final Optional clientId;
+public final Optional logEndOffset;
+
+public PartitionAssignmentState(String group, Optional coordinator, 
Optional topic,
+Optional partition, 
Optional offset, Optional lag,
+Optional consumerId, 
Optional host, Optional clientId,
+Optional logEndOffset) {
+this.group = group;
+this.coordinator = coordinator;
+this.topic = topic;
+this.partition = partition;
+this.offset = offset;
+this.lag = lag;
+this.consumerId = consumerId;
+this.host = host;
+this.clientId = clientId;
+this.logEndOffset = logEndOffset;
+}
+}

Review Comment:
   Nit: Can we add a newline?



##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
+public static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
+
+public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+public static final String GROUP_DOC = "The consumer group we wish to act 
on.";
+public static final String TOPIC_DOC = "The topic whose consumer group 
information should be deleted or topic whose should be included in the reset 
offset process. " +
+"In `reset-offsets` case, partitions can be specified using this 
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the 
process. " +
+"Reset-offsets also supports multiple topic inputs.";
+public static final String ALL_TOPICS_DOC = "Consider all topics assigned 
to a group in the `reset-offsets` process.";
+public static final String LIST_DOC = "List all consumer groups.";
+public static final String DESCRIBE_DOC = "Describe consumer group and 
list offset lag (number of messages not yet processed) related to given group.";
+public static final String ALL_GROUPS_DOC = "Apply to all consumer 
groups.";
+public static final String NL = System.lineSeparator();
+public static final String DELETE_DOC = "Pass in groups to delete topic 
part

[jira] [Commented] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807814#comment-17807814
 ] 

Mickael Maison commented on KAFKA-16157:


Did you hit this issue while testing 3.7.0 RC2 or with trunk?

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.1
>Reporter: Gaurav Narula
>Priority: Major
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-17 Thread via GitHub


philipnee commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1456105594


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1336,8 +1336,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 MockProducerInterceptor.resetCounters()
   }
 
+  // This is disabled for the the consumer group until KAFKA-16155 is resolved.

Review Comment:
   thanks for reporting this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-17 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano updated KAFKA-16157:
--
Priority: Blocker  (was: Major)

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.1
>Reporter: Gaurav Narula
>Priority: Blocker
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-17 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula updated KAFKA-16157:
--
Description: 
In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` in 
each broker, we perform the following operations:

 
 # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
 # Start a producer in the background to produce to `foo.test`.
 # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
read-only.
 # Delete topic `foo.test`
 # Recreate topic `foo.test`. Let's assume the topic was created with id 
`bgdrsv-1QjCLFEqLOzVCHg`.
 # Wait for 5 minutes
 # Describe the recreated topic `foo.test`.

 

We observe that `broker-1` is the leader and in-sync for few partitions

 

 
{code:java}
 
Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
ReplicationFactor: 4    Configs: 
min.insync.replicas=1,unclean.leader.election.enable=false
        Topic: foo.test Partition: 0    Leader: 101     Replicas: 
101,102,103,104       Isr: 101,102,103,104
        Topic: foo.test Partition: 1    Leader: 102     Replicas: 
102,103,104,101       Isr: 102,103,104
        Topic: foo.test Partition: 2    Leader: 103     Replicas: 
103,104,101,102       Isr: 103,104,102
        Topic: foo.test Partition: 3    Leader: 104     Replicas: 
104,101,102,103       Isr: 104,102,103
        Topic: foo.test Partition: 4    Leader: 104     Replicas: 
104,102,101,103       Isr: 104,102,103
        Topic: foo.test Partition: 5    Leader: 102     Replicas: 
102,101,103,104       Isr: 102,103,104
        Topic: foo.test Partition: 6    Leader: 101     Replicas: 
101,103,104,102       Isr: 101,103,104,102
        Topic: foo.test Partition: 7    Leader: 103     Replicas: 
103,104,102,101       Isr: 103,104,102
        Topic: foo.test Partition: 8    Leader: 101     Replicas: 
101,102,104,103       Isr: 101,102,104,103
        Topic: foo.test Partition: 9    Leader: 102     Replicas: 
102,104,103,101       Isr: 102,104,103
{code}
 

 

In this example, it is the leader of partitions `0, 6 and 8`.

 

Consider `foo.test-8`. It is present in the following brokers/disks:

 

 
{code:java}
$ fd foo.test-8
broker-1/d1/foo.test-8/
broker-2/d2/foo.test-8/
broker-3/d2/foo.test-8/
broker-4/d1/foo.test-8/{code}
 

 

`broker-1/d1` still refers to the topic id which is pending deletion because 
the log dir is marked offline.

 

 
{code:java}
$ cat broker-1/d1/foo.test-8/partition.metadata
version: 0
topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
 

 

However, other brokers have the correct topic-id

 

 
{code:java}
$ cat broker-2/d2/foo.test-8/partition.metadata
version: 0
topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
 

 

Now, let's consider `foo.test-0`. We observe that the replica isn't present in 
`broker-1`:
{code:java}
$ fd foo.test-0
broker-2/d1/foo.test-0/
broker-3/d1/foo.test-0/
broker-4/d2/foo.test-0/{code}
In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
partitions.

 

  was:
In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` in 
each broker, we perform the following operations:

 
 # Create a topic `foo.test` with 10 replicas and RF 4. Let's assume the topic 
was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
 # Start a producer in the background to produce to `foo.test`.
 # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
read-only.
 # Delete topic `foo.test`
 # Recreate topic `foo.test`. Let's assume the topic was created with id 
`bgdrsv-1QjCLFEqLOzVCHg`.
 # Wait for 5 minutes
 # Describe the recreated topic `foo.test`.

 

We observe that `broker-1` is the leader and in-sync for few partitions

 

 
{code:java}
 
Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
ReplicationFactor: 4    Configs: 
min.insync.replicas=1,unclean.leader.election.enable=false
        Topic: foo.test Partition: 0    Leader: 101     Replicas: 
101,102,103,104       Isr: 101,102,103,104
        Topic: foo.test Partition: 1    Leader: 102     Replicas: 
102,103,104,101       Isr: 102,103,104
        Topic: foo.test Partition: 2    Leader: 103     Replicas: 
103,104,101,102       Isr: 103,104,102
        Topic: foo.test Partition: 3    Leader: 104     Replicas: 
104,101,102,103       Isr: 104,102,103
        Topic: foo.test Partition: 4    Leader: 104     Replicas: 
104,102,101,103       Isr: 104,102,103
        Topic: foo.test Partition: 5    Leader: 102     Replicas: 
102,101,103,104       Isr: 102,103,104
        Topic: foo.test Partition: 6    Leader: 101     Replicas: 
101,103,104,102       Isr: 101,103,104,102
        Topic: foo.test Partition: 7    Leader: 103     Replicas: 
103,104,102,101       Isr: 103,104,102
        Topic: foo.test Partition: 8    Leader: 101     Replicas: 
101,102,104,103       Isr: 101,102,104,103
        Topic: foo.

Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]

2024-01-17 Thread via GitHub


philipnee commented on PR #15210:
URL: https://github.com/apache/kafka/pull/15210#issuecomment-1896275067

   @lucasbru - Would you have time to review this? Seems like the failed tests 
aren't necessary related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-17 Thread via GitHub


lianetm opened a new pull request, #15215:
URL: https://github.com/apache/kafka/pull/15215

   This ensures that no records are fetched in the background thread while the 
onPartitionsAssigned callback completes running in the Application thread. This 
is achieved by pausing the partitions before triggering the callback, and 
resuming them when the callback successfully completes. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-16157:
---
Affects Version/s: 3.7.0
   (was: 3.7.1)

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Priority: Blocker
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104,103,101       Isr: 102,104,103
> {code}
>  
>  
> In this example, it is the leader of partitions `0, 6 and 8`.
>  
> Consider `foo.test-8`. It is present in the following brokers/disks:
>  
>  
> {code:java}
> $ fd foo.test-8
> broker-1/d1/foo.test-8/
> broker-2/d2/foo.test-8/
> broker-3/d2/foo.test-8/
> broker-4/d1/foo.test-8/{code}
>  
>  
> `broker-1/d1` still refers to the topic id which is pending deletion because 
> the log dir is marked offline.
>  
>  
> {code:java}
> $ cat broker-1/d1/foo.test-8/partition.metadata
> version: 0
> topic_id: rAujIqcjRbu_-E4UxgQT8Q{code}
>  
>  
> However, other brokers have the correct topic-id
>  
>  
> {code:java}
> $ cat broker-2/d2/foo.test-8/partition.metadata
> version: 0
> topic_id: bgdrsv-1QjCLFEqLOzVCHg%{code}
>  
>  
> Now, let's consider `foo.test-0`. We observe that the replica isn't present 
> in `broker-1`:
> {code:java}
> $ fd foo.test-0
> broker-2/d1/foo.test-0/
> broker-3/d1/foo.test-0/
> broker-4/d2/foo.test-0/{code}
> In both cases, `broker-1` shouldn't be the leader or in-sync replica for the 
> partitions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-17 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1456228457


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -935,8 +935,9 @@ private[group] class GroupCoordinator(
   producerId,
   producerEpoch,
   RecordBatch.NO_SEQUENCE,
-  requestLocal,
-  postVerificationCallback
+  // Wrap the callback to be handled on an arbitrary request handler 
thread
+  // when transaction verification is complete.
+  KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, 
requestLocal)

Review Comment:
   Oh sorry. I guess I was just confused I didn't see it in the replica manager 
flow. (for produce)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


jolshan commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456242441


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*

Review Comment:
   Is the tools/consumergroup a new folder? I wonder if there is a name that is 
more consistent with the other folders. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


jolshan commented on PR #14856:
URL: https://github.com/apache/kafka/pull/14856#issuecomment-1896365557

   Did we want to delete the old files in this PR or a follow up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]

2024-01-17 Thread via GitHub


kirktrue commented on PR #14818:
URL: https://github.com/apache/kafka/pull/14818#issuecomment-1896409297

   @jcme—Can you trigger a rebuild of the CI job? It looks like the last run 
didn't work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-01-17 Thread via GitHub


philipnee opened a new pull request, #15216:
URL: https://github.com/apache/kafka/pull/15216

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-17 Thread via GitHub


jolshan commented on code in PR #15212:
URL: https://github.com/apache/kafka/pull/15212#discussion_r1456314640


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -861,19 +861,9 @@ private void maybeUpdatePartitionEpoch(
 ConsumerGroupMember oldMember,
 ConsumerGroupMember newMember
 ) {
-if (oldMember == null) {
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-} else {
-if 
(!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) {
-removePartitionEpochs(oldMember.assignedPartitions());
-addPartitionEpochs(newMember.assignedPartitions(), 
newMember.memberEpoch());
-}
-if 
(!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation()))
 {
-removePartitionEpochs(oldMember.partitionsPendingRevocation());
-addPartitionEpochs(newMember.partitionsPendingRevocation(), 
newMember.memberEpoch());
-}
-}
+maybeRemovePartitionEpoch(oldMember);

Review Comment:
   This logic seems simpler. Why didn't we just use it before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-17 Thread via GitHub


pprovenzano commented on PR #15197:
URL: https://github.com/apache/kafka/pull/15197#issuecomment-1896448532

   Test failures are not related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15811: Enhance request context with client socket port information (KIP-714) [kafka]

2024-01-17 Thread via GitHub


junrao commented on code in PR #15190:
URL: https://github.com/apache/kafka/pull/15190#discussion_r1456303419


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1140,9 +1141,9 @@ private[kafka] class Processor(
 expiredConnectionsKilledCount.record(null, 1, 0)
   } else {
 val connectionId = receive.source
-val context = new RequestContext(header, connectionId, 
channel.socketAddress,
-  channel.principal, listenerName, securityProtocol,
-  channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener, channel.principalSerde)
+val context = new RequestContext(header, connectionId, 
channel.socketAddress, Optional.of(channel.socketPort()),

Review Comment:
   Hmm, channel.socketPort() is never null. Why do we need to convert it to an 
Optional?



##
server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java:
##
@@ -59,6 +60,7 @@ public static RequestContext requestContext() throws 
UnknownHostException {
 new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, 
"producer-1", 0),
 "1",
 InetAddress.getLocalHost(),
+Optional.of(56078),

Review Comment:
   Could we define a constant for 56078 and reuse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2024-01-17 Thread via GitHub


gharris1727 commented on PR #14729:
URL: https://github.com/apache/kafka/pull/14729#issuecomment-1896468099

   @chia7712 @showuon @mimaison Are any of you able to review this resource 
leak fix? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]

2024-01-17 Thread via GitHub


gharris1727 closed pull request #14762: KAFKA-15826: Close consumer when sink 
task is cancelled
URL: https://github.com/apache/kafka/pull/14762


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]

2024-01-17 Thread via GitHub


gharris1727 commented on PR #14762:
URL: https://github.com/apache/kafka/pull/14762#issuecomment-1896474948

   I think due to the locking concerns I raised earlier, and that we can 
resolve this resource leak in our tests, this PR is not viable to merge. We can 
revisit this in the future if the locking model of the consumer can be changed, 
or if this becomes a demonstrable problem in real situations and not just tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-01-17 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807858#comment-17807858
 ] 

Justine Olshan commented on KAFKA-16156:


The transactional copier is pretty old. We can probably update it. If I get 
time I can take a look.

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Lianet Magrans
>Priority: Major
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.complete(Comp

[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16116:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER

2024-01-17 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807866#comment-17807866
 ] 

Kirk True commented on KAFKA-15878:
---

[~philomathanuj]—for this to work, I assume the user would have to implement 
custom callback handlers that use a non-JWT validation approach, correct?

> KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
> 
>
> Key: KAFKA-15878
> URL: https://issues.apache.org/jira/browse/KAFKA-15878
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Anuj Sharma
>Priority: Major
>  Labels: oauth
> Fix For: 3.8.0
>
>
> {code:java}
> // code placeholder
> {code}
> h1. Overview
>  * This issue pertains to 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>  mechanism of Kafka authentication. 
>  * Kafka clients can use [SASL/OAUTHBEARER  
> |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism 
> by overriding the [custom call back 
> handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
>  . 
>  * 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  available from v3.1  further extends the mechanism with a production grade 
> implementation. 
>  * Kafka's 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>   mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
> because of a more restrictive set of characters than what 
> [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
> recommends. 
>  * This JIRA can be considered an extension of 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  to support the opaque tokens as well apart from the JWT tokens.
>  
> In summary the following character set should be supported as per the RFC - 
> {code:java}
> 1*( ALPHA / DIGIT /
>"-" / "." / "_" / "~" / "+" / "/" ) *"="
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe

2024-01-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16150.
---
Resolution: Duplicate

> Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
> 
>
> Key: KAFKA-16150
> URL: https://issues.apache.org/jira/browse/KAFKA-16150
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe

2024-01-17 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16151.
---
Resolution: Duplicate

> Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
> -
>
> Key: KAFKA-16151
> URL: https://issues.apache.org/jira/browse/KAFKA-16151
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456440526


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.CheckoutCommand;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.InitCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.lib.*;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+
+static String[] filesCheckMetadata = {"AccessControlEntryRecord.json", 
"BrokerRegistrationChangeRecord.json", "ClientQuotaRecord.json",
+"ConfigRecord.json", "DelegationTokenRecord.json", 
"FeatureLevelRecord.json", "FenceBrokerRecord.json", "NoOpRecord.json",
+"PartitionChangeRecord.json", "PartitionRecord.json", 
"ProducerIdsRecord.json", "RegisterBrokerRecord.json",
+"RemoveAccessControlEntryRecord.json", "RemoveTopicRecord.json", 
"RemoveUserScramCredentialRecord.json", "TopicRecord.json",
+"UnfenceBrokerRecord.json", "UnregisterBrokerRecord.json", 
"UserScramCredentialRecord.json", "ZkMigrationRecord.json"};
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String jsonSchema: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + jsonSchema;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {

Review Comment:
   Please change this not to use a raw 15, but to use an appropriate constant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456442184


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+static String[] filesCheckMetadata = new 
File(System.getProperty("user.dir") + 
"/metadata/src/main/resources/common/metadata/").list();

Review Comment:
   please change this to use something like `Paths.get` so that it's not 
dependent on the system path separator (which is different on Windows)



##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+static String[] filesCheckMetadata = new 
File(System.getProperty("user.dir") + 
"/metadata/src/main/resources/common/metadata/").list();
+public static void main(String[] args) throws Exception {
+
+ 

Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456441308


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+static String[] filesCheckMetadata = new 
File(System.getProperty("user.dir") + 
"/metadata/src/main/resources/common/metadata/").list();
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String fileName: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + fileName;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {
+reader.readLine();
+}
+StringBuilder content = new StringBuilder();
+boolean print = false;
+for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+if (line.charAt(0) == '{') {
+print = true;
+}
+if (print && !line.contains("//")) {

Review Comment:
   It's not enough for the line to contain `//`. It needs to start with `//` 
(after whitespace has been trimmed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456447370


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+static String[] filesCheckMetadata = new 
File(System.getProperty("user.dir") + 
"/metadata/src/main/resources/common/metadata/").list();
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String fileName: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + fileName;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {
+reader.readLine();
+}
+StringBuilder content = new StringBuilder();
+boolean print = false;
+for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+if (line.charAt(0) == '{') {
+print = true;
+}
+if (print && !line.contains("//")) {
+content.append(line);
+}
+}
+localContent.add(content.toString());
+}
+
+List gitContent = GetDataFromGit();
+if (localContent.size() != gitContent.size()) {
+throw new IllegalStateException("missing schemas");
+}
+for(int i = 0; i < localContent.size(); i++) {
+if (Objects.equals(localContent.get(i), gitContent.get(i))) {
+continue;
+}
+
+ObjectMapper objectMapper = new ObjectMapper();
+JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i));
+JsonNode jsonNode2 = 
objectMapper.readTree(localContent.get(i));
+
+checkApiTypeVersions(jsonNode1, jsonNode2);
+parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) 
jsonNode2.get("fields"));
+}
+
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+private static List GetDataFromGit() throws IOException, 
GitAPIException {
+List gitSchemas = new ArrayList<>();
+
+Git git = Git.open(new File(System.getProperty("user.dir") + "/.git"));

Review Comment:
   you don't need the slash here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456448643


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+static String[] filesCheckMetadata = new 
File(System.getProperty("user.dir") + 
"/metadata/src/main/resources/common/metadata/").list();
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String fileName: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + fileName;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {
+reader.readLine();
+}
+StringBuilder content = new StringBuilder();
+boolean print = false;
+for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+if (line.charAt(0) == '{') {
+print = true;
+}
+if (print && !line.contains("//")) {
+content.append(line);
+}
+}
+localContent.add(content.toString());
+}
+
+List gitContent = GetDataFromGit();
+if (localContent.size() != gitContent.size()) {
+throw new IllegalStateException("missing schemas");
+}
+for(int i = 0; i < localContent.size(); i++) {
+if (Objects.equals(localContent.get(i), gitContent.get(i))) {
+continue;
+}
+
+ObjectMapper objectMapper = new ObjectMapper();
+JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i));
+JsonNode jsonNode2 = 
objectMapper.readTree(localContent.get(i));
+
+checkApiTypeVersions(jsonNode1, jsonNode2);
+parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) 
jsonNode2.get("fields"));
+}
+
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+private static List GetDataFromGit() throws IOException, 
GitAPIException {
+List gitSchemas = new ArrayList<>();
+
+Git git = Git.open(new File(System.getProperty("user.dir") + "/.git"));
+Repository repository = git.getRepository();
+Ref head = 
git.getRepository().getRefDatabase().firstExactRef("refs/heads/trunk");
+
+try (RevWalk revWalk = new RevWalk(repository)) {
+RevCommit commit = revWalk.parseCommit(head.getObjectId());
+RevTree tree = commit.getTree();
+for (String fileName : filesCheckMetadata)

Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456448973


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.CheckoutCommand;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.InitCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.lib.*;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+
+static String[] filesCheckMetadata = {"AccessControlEntryRecord.json", 
"BrokerRegistrationChangeRecord.json", "ClientQuotaRecord.json",
+"ConfigRecord.json", "DelegationTokenRecord.json", 
"FeatureLevelRecord.json", "FenceBrokerRecord.json", "NoOpRecord.json",
+"PartitionChangeRecord.json", "PartitionRecord.json", 
"ProducerIdsRecord.json", "RegisterBrokerRecord.json",
+"RemoveAccessControlEntryRecord.json", "RemoveTopicRecord.json", 
"RemoveUserScramCredentialRecord.json", "TopicRecord.json",
+"UnfenceBrokerRecord.json", "UnregisterBrokerRecord.json", 
"UserScramCredentialRecord.json", "ZkMigrationRecord.json"};
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String jsonSchema: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + jsonSchema;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {
+reader.readLine();
+}
+StringBuilder content = new StringBuilder();
+for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+content.append(line);
+}
+localContent.add(content.toString());
+}
+
+List gitContent = GetDataFromGit();
+if (localContent.size() != gitContent.size()) {
+throw new IllegalStateException("missing schemas");
+}
+for(int i = 0; i < localContent.size(); i++) {
+if (Objects.equals(localContent.get(i), gitContent.get(i))) {
+continue;
+}
+
+ObjectMapper objectMapper = new ObjectMapper();
+JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i));
+JsonNode jsonNode2 = 
objectMapper.readTree(localContent.get(i));
+
+checkApiTypeVersions(jsonNode1, jsonNode2);
+parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) 
jsonNode2.get("fields"));
+}
+
+} catch (FileNotFoundException e) {
+System.out.println("An error occurred.");
+e.printStackTrace();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+private static List GetDataFromGit() throws IOExcepti

Re: [PR] Metadata schema checker [kafka]

2024-01-17 Thread via GitHub


cmccabe commented on code in PR #14389:
URL: https://github.com/apache/kafka/pull/14389#discussion_r1456451709


##
tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.SchemaChecker;
+
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.PathFilter;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+public class MetadataSchemaChecker {
+
+static int latestTag = -1;
+static int  latestTagVersion = -1;
+static int oldLatestVersion = -1;
+static int oldFirstVersion = -1;
+static int newLatestVersion = -1;
+static int newFirstVersion = -1;
+static String[] filesCheckMetadata = new 
File(System.getProperty("user.dir") + 
"/metadata/src/main/resources/common/metadata/").list();
+public static void main(String[] args) throws Exception {
+
+try {
+List localContent = new ArrayList<>();
+for(String fileName: filesCheckMetadata) {
+final String dir = System.getProperty("user.dir");
+String path = dir + 
"/metadata/src/main/resources/common/metadata/" + fileName;
+BufferedReader reader = new BufferedReader(new 
FileReader(path));
+for (int i = 0; i < 15; i++) {
+reader.readLine();
+}
+StringBuilder content = new StringBuilder();
+boolean print = false;
+for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+if (line.charAt(0) == '{') {
+print = true;
+}
+if (print && !line.contains("//")) {
+content.append(line);
+}
+}
+localContent.add(content.toString());
+}
+
+List gitContent = GetDataFromGit();
+if (localContent.size() != gitContent.size()) {
+throw new IllegalStateException("missing schemas");
+}
+for(int i = 0; i < localContent.size(); i++) {
+if (Objects.equals(localContent.get(i), gitContent.get(i))) {
+continue;
+}
+
+ObjectMapper objectMapper = new ObjectMapper();
+JsonNode jsonNode1 = objectMapper.readTree(gitContent.get(i));
+JsonNode jsonNode2 = 
objectMapper.readTree(localContent.get(i));
+
+checkApiTypeVersions(jsonNode1, jsonNode2);
+parser((ArrayNode) jsonNode1.get("fields"), (ArrayNode) 
jsonNode2.get("fields"));
+}
+
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+private static List GetDataFromGit() throws IOException, 
GitAPIException {
+List gitSchemas = new ArrayList<>();
+
+Git git = Git.open(new File(System.getProperty("user.dir") + "/.git"));
+Repository repository = git.getRepository();
+Ref head = 
git.getRepository().getRefDatabase().firstExactRef("refs/heads/trunk");
+
+try (RevWalk revWalk = new RevWalk(repository)) {
+RevCommit commit = revWalk.parseCommit(head.getObjectId());
+RevTree tree = commit.getTree();
+for (String fileName : filesCheckMetadata)

[jira] [Assigned] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16141:
---

Assignee: Matthias J. Sax  (was: Almog Gavra)

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807890#comment-17807890
 ] 

Matthias J. Sax commented on KAFKA-16141:
-

After discussion with Almog (and testing it) implementing `WrappedStateStore` 
won't do the trick. Given that this is a regression, it seems best (most 
pragmatic) to just revert the change on 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement 
`TimestampedBytesStore` for now (even if this still seems to be the right thing 
to do – however, it might require some larger changes to make it work – will 
file a follow up ticket for this).

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]

2024-01-17 Thread via GitHub


mjsax opened a new pull request, #15217:
URL: https://github.com/apache/kafka/pull/15217

   KAFKA-15629 added `TimestampedByteStore` interface to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore code 
path and thus some system tests.
   
   This PR reverts this change for now.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]

2024-01-17 Thread via GitHub


mjsax commented on PR #15217:
URL: https://github.com/apache/kafka/pull/15217#issuecomment-1896696469

   \cc @agavra @stanislavkozlovski 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]

2024-01-17 Thread via GitHub


mjsax commented on code in PR #15217:
URL: https://github.com/apache/kafka/pull/15217#discussion_r1456485535


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -422,6 +423,16 @@ public static  Function 
getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), 
byteArray);
 }
 
+public static boolean isAdapter(final StateStore stateStore) {

Review Comment:
   Adding this helper (which is kinda "hack") for now to not break IQv2 code 
path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16158) Cleanup usage of `TimestampedBytesStore` interface

2024-01-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16158:
---

 Summary: Cleanup usage of `TimestampedBytesStore` interface
 Key: KAFKA-16158
 URL: https://issues.apache.org/jira/browse/KAFKA-16158
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We added `TimestampedBytesStore` interface many release ago. It's purpose is to 
indicate if a byte-store's binary value contains a "plain value" or a 
"" format. Stores with "" format should implement the 
interface, however not all stores which this format do.

We tried to fix one occurrence via 
https://issues.apache.org/jira/browse/KAFKA-15629 by adding 
`TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter`, 
whoever this change broke the restore code path (cf 
https://issues.apache.org/jira/browse/KAFKA-16141) and thus we reverted the 
change.

During the investigation, we also notices that 
`InMemoryTimestampedKeyValueStoreMarker` implements `TimestampedBytesStore` but 
does not do a byte-array translation (it's unclear why no byte array 
translation happens) – and it's also unclear if in-memory store is testes 
properly.

We should try to clean this all up, adding `TimestampedBytesStore` to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and figure out how avoid 
breaking the restore code path. In addition, we should verify if 
`InMemoryTimestampedKeyValueStoreMarker` is correct or not, and if the restore 
code path (and maybe also IQv2 code path) is tested properly and correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807890#comment-17807890
 ] 

Matthias J. Sax edited comment on KAFKA-16141 at 1/17/24 9:15 PM:
--

After discussion with Almog (and testing it) implementing `WrappedStateStore` 
won't do the trick. Given that this is a regression, it seems best (most 
pragmatic) to just revert the change on 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement 
`TimestampedBytesStore` for now (even if this still seems to be the right thing 
to do – however, it might require some larger changes to make it work – will 
filed a follow up ticket for this: 
https://issues.apache.org/jira/browse/KAFKA-16158).


was (Author: mjsax):
After discussion with Almog (and testing it) implementing `WrappedStateStore` 
won't do the trick. Given that this is a regression, it seems best (most 
pragmatic) to just revert the change on 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement 
`TimestampedBytesStore` for now (even if this still seems to be the right thing 
to do – however, it might require some larger changes to make it work – will 
file a follow up ticket for this).

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16141: Fix StreamsStandbyTask system test [kafka]

2024-01-17 Thread via GitHub


mjsax commented on PR #15217:
URL: https://github.com/apache/kafka/pull/15217#issuecomment-1896752852

   Triggered a system test run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6035/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-17 Thread via GitHub


anurag-harness opened a new pull request, #15218:
URL: https://github.com/apache/kafka/pull/15218

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16147; Partition is assigned to two members at the same time [kafka]

2024-01-17 Thread via GitHub


jeffkbkim commented on PR #15212:
URL: https://github.com/apache/kafka/pull/15212#issuecomment-1896786496

   > when a member has a partition pending revocation re-assigned to him before 
the revocation is completed, the partition epoch is lost
   
   to confirm, the `previousMemberEpoch=11` should be `14` in the given example 
right?
   
   > a partition got assigned to two members
   
   this seems a bit scary. should we have some check that ensures the invariant?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16141:

Issue Type: Bug  (was: Test)

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-17 Thread via GitHub


mjsax merged PR #15207:
URL: https://github.com/apache/kafka/pull/15207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]

2024-01-17 Thread via GitHub


mjsax commented on PR #15207:
URL: https://github.com/apache/kafka/pull/15207#issuecomment-1896826978

   Merged to `trunk` and cherry-picked to `3.7` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16139.
-
Fix Version/s: 3.7.0
   3.6.1
   Resolution: Fixed

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


nizhikov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456518422


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/PartitionAssignmentState.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import org.apache.kafka.common.Node;
+
+import java.util.Optional;
+
+class PartitionAssignmentState {
+public final String group;
+public final Optional coordinator;
+public final Optional topic;
+public final Optional partition;
+public final Optional offset;
+public final Optional lag;
+public final Optional consumerId;
+public final Optional host;
+public final Optional clientId;
+public final Optional logEndOffset;
+
+public PartitionAssignmentState(String group, Optional coordinator, 
Optional topic,
+Optional partition, 
Optional offset, Optional lag,
+Optional consumerId, 
Optional host, Optional clientId,
+Optional logEndOffset) {
+this.group = group;
+this.coordinator = coordinator;
+this.topic = topic;
+this.partition = partition;
+this.offset = offset;
+this.lag = lag;
+this.consumerId = consumerId;
+this.host = host;
+this.clientId = clientId;
+this.logEndOffset = logEndOffset;
+}
+}

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


nizhikov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456521165


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
+public static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
+
+public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+public static final String GROUP_DOC = "The consumer group we wish to act 
on.";
+public static final String TOPIC_DOC = "The topic whose consumer group 
information should be deleted or topic whose should be included in the reset 
offset process. " +
+"In `reset-offsets` case, partitions can be specified using this 
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the 
process. " +
+"Reset-offsets also supports multiple topic inputs.";
+public static final String ALL_TOPICS_DOC = "Consider all topics assigned 
to a group in the `reset-offsets` process.";
+public static final String LIST_DOC = "List all consumer groups.";
+public static final String DESCRIBE_DOC = "Describe consumer group and 
list offset lag (number of messages not yet processed) related to given group.";
+public static final String ALL_GROUPS_DOC = "Apply to all consumer 
groups.";
+public static final String NL = System.lineSeparator();
+public static final String DELETE_DOC = "Pass in groups to delete topic 
partition offsets and ownership information " +
+"over the entire consumer group. For instance --group g1 --group g2";
+public static final String TIMEOUT_MS_DOC = "The timeout that can be set 
for some use cases. For example, it can be used when describing the group " +
+"to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes (when the group is just created, " +
+"or is going through some changes).";
+public static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client and Consumer.";
+public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer 
group. Supports one consumer group at the time, and instances should be 
inactive" + NL +
+"Has 2 execution options: --dry-run (the default) to plan which 
offsets to reset, and --execute to update the offsets. " +
+"Additionally, the --export option is used to export the results to a 
CSV format." + NL +
+"You must choose one of the following reset specifications: 
--to-datetime, --by-duration, --to-earliest, " +
+"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + 
NL +
+"To define the scope use --all-topics or --topic. One scope must be 
specified unless you use '--from-file'.";
+public static final String DRY_RUN_DOC = "Only show results without 
executing changes on Consumer Groups. Supported operations: reset-offsets.";
+public static final String EXECUTE_DOC = "Execute operation. Supported 
operations: reset-offsets.";
+public static final String EXPORT_DOC = "Export operation execution to a 
CSV file. Supported operations: reset-offsets.";
+public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a 
specific offset.";
+public static final String RESET_FROM_FILE_DOC = "Reset offsets to values 
defined in CSV file.";
+public static final String RESET_TO_DATETIME_DOC = "Reset offsets to 
offset from datetime. Format: '-MM-DDTHH:mm:SS.sss'";
+public static final String RESET_BY_DURATION_DOC = "Reset offsets to 
offset by duration from current timestamp. Format: 'PnDTnHnMnS'";
+pub

Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-17 Thread via GitHub


mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1456522509


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##
@@ -715,15 +716,22 @@ private Optional> 
createPushRequest(ClientTelemetrySubscription local
 }
 
 CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-ByteBuffer buffer = ClientTelemetryUtils.compress(payload, 
compressionType);
+byte[] compressedPayload;
+try {
+compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
+} catch (IOException e) {
+log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);

Review Comment:
   Fine with me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


nizhikov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456523688


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumergroup;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
+public static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
+
+public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+public static final String GROUP_DOC = "The consumer group we wish to act 
on.";
+public static final String TOPIC_DOC = "The topic whose consumer group 
information should be deleted or topic whose should be included in the reset 
offset process. " +
+"In `reset-offsets` case, partitions can be specified using this 
format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the 
process. " +
+"Reset-offsets also supports multiple topic inputs.";
+public static final String ALL_TOPICS_DOC = "Consider all topics assigned 
to a group in the `reset-offsets` process.";
+public static final String LIST_DOC = "List all consumer groups.";
+public static final String DESCRIBE_DOC = "Describe consumer group and 
list offset lag (number of messages not yet processed) related to given group.";
+public static final String ALL_GROUPS_DOC = "Apply to all consumer 
groups.";
+public static final String NL = System.getProperty("line.separator");
+public static final String DELETE_DOC = "Pass in groups to delete topic 
partition offsets and ownership information " +
+"over the entire consumer group. For instance --group g1 --group g2";
+public static final String TIMEOUT_MS_DOC = "The timeout that can be set 
for some use cases. For example, it can be used when describing the group " +
+"to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes (when the group is just created, " +
+"or is going through some changes).";
+public static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client and Consumer.";
+public static final String RESET_OFFSETS_DOC = "Reset offsets of consumer 
group. Supports one consumer group at the time, and instances should be 
inactive" + NL +
+"Has 2 execution options: --dry-run (the default) to plan which 
offsets to reset, and --execute to update the offsets. " +
+"Additionally, the --export option is used to export the results to a 
CSV format." + NL +
+"You must choose one of the following reset specifications: 
--to-datetime, --by-duration, --to-earliest, " +
+"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + 
NL +
+"To define the scope use --all-topics or --topic. One scope must be 
specified unless you use '--from-file'.";
+public static final String DRY_RUN_DOC = "Only show results without 
executing changes on Consumer Groups. Supported operations: reset-offsets.";
+public static final String EXECUTE_DOC = "Execute operation. Supported 
operations: reset-offsets.";
+public static final String EXPORT_DOC = "Export operation execution to a 
CSV file. Supported operations: reset-offsets.";
+public static final String RESET_TO_OFFSET_DOC = "Reset offsets to a 
specific offset.";
+public static final String RESET_FROM_FILE_DOC = "Reset offsets to values 
defined in CSV file.";
+public static final String RESET_TO_DATETIME_DOC = "Reset offsets to 
offset from datetime. Format: '-MM-DDTHH:mm:SS.sss'";
+public static final String RESET_BY_DURATION_DOC = "Reset offsets to 
offset by duration from current timestamp. Format: 'PnDTnHnM

Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]

2024-01-17 Thread via GitHub


mjsax merged PR #15148:
URL: https://github.com/apache/kafka/pull/15148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-17 Thread via GitHub


jeffkbkim commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1456526190


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 return groupId != null && !groupId.isEmpty();
 }
 
-/**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
- */
-private static Errors normalizeException(Throwable exception) {
-exception = Errors.maybeUnwrapException(exception);
-
-if (exception instanceof UnknownTopicOrPartitionException ||
-exception instanceof NotEnoughReplicasException ||
-exception instanceof TimeoutException) {
-return Errors.COORDINATOR_NOT_AVAILABLE;
-}
-
-if (exception instanceof NotLeaderOrFollowerException ||
-exception instanceof KafkaStorageException) {
-return Errors.NOT_COORDINATOR;
-}
-
-if (exception instanceof RecordTooLargeException ||
-exception instanceof RecordBatchTooLargeException ||
-exception instanceof InvalidFetchSizeException) {
-return Errors.UNKNOWN_SERVER_ERROR;
+private  RSP handleOperationException(
+String requestName,
+REQ request,
+Throwable exception,
+BiFunction responseBuilder
+) {
+ApiError apiError = ApiError.fromThrowable(exception);
+
+switch (apiError.error()) {
+case UNKNOWN_SERVER_ERROR:
+log.error("{} request {} hit an unexpected exception: {}.",
+requestName, request, exception.getMessage(), exception);
+return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+case UNKNOWN_TOPIC_OR_PARTITION:
+case NOT_ENOUGH_REPLICAS:
+case REQUEST_TIMED_OUT:
+return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, 
null);
+
+case NOT_LEADER_OR_FOLLOWER:
+case KAFKA_STORAGE_ERROR:
+return responseBuilder.apply(Errors.NOT_COORDINATOR, null);
+
+case MESSAGE_TOO_LARGE:
+case RECORD_LIST_TOO_LARGE:
+case INVALID_FETCH_SIZE:
+return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+default:
+return responseBuilder.apply(apiError.error(), 
apiError.message());

Review Comment:
   it looks like the error message is only set by `consumerGroupHeartbeat`. 
what's an example of an error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


nizhikov commented on PR #14856:
URL: https://github.com/apache/kafka/pull/14856#issuecomment-1896962207

   @jolshan 
   
   > Did we want to delete the old files in this PR or a follow up?
   
   For the previous command we remove old files only when command was merged 
   
   1. Reassign case classes and options moved to java - 
1fd58e30cf38587a66c1f7188c7667b555624485
   3. Command moved to java and old classes removed - 
76b1b50b644149e77ee1ec42d882e2cb80742bdf
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


nizhikov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456530381


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*

Review Comment:
   > Is the tools/consumergroup a new folder?
   
   yes.
   
   > I wonder if there is a name that is more consistent with the other folders.
   
   Couldn't come up with the better naming :) 
   Do you have one in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


jolshan commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456536862


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*

Review Comment:
   I wasn't sure if we wanted to use the same convention of the other java 
consumer group files. But maybe that is confusing. They follow a pattern of 
coordinator/group. Not sure if we really have reusability amongst coordinator 
tools though. 
   
   I think when we have had two word folders we usually put a dash between them 
(ie consumer-group)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]

2024-01-17 Thread via GitHub


cmccabe merged PR #15197:
URL: https://github.com/apache/kafka/pull/15197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16159:
--

 Summary: Prune excessive logging from Telemetry Reporter
 Key: KAFKA-16159
 URL: https://issues.apache.org/jira/browse/KAFKA-16159
 Project: Kafka
  Issue Type: Task
  Components: consumer, log
Reporter: Philip Nee
Assignee: Apoorv Mittal


While running system tests locally, I've noticed excessive logging of the 
Telemtry Reporter.  This I believe was introduced in KIP-714.
{code:java}
[2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
returning the value 224678 ms; the client will wait before submitting the next 
GetTelemetrySubscriptions network API request 
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
This is logged several times per ms - Also, given the amount of log being 
emitted, can we also check the CPU profile to see if there's a process running 
a tight loop?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-17 Thread via GitHub


nizhikov commented on code in PR #14856:
URL: https://github.com/apache/kafka/pull/14856#discussion_r1456546534


##
tools/src/main/java/org/apache/kafka/tools/consumergroup/ConsumerGroupCommandOptions.java:
##
@@ -0,0 +1,263 @@
+/*

Review Comment:
   > I think when we have had two word folders we usually put a dash between 
them (ie consumer-group)
   
   Dash can't be used in java package name.
   
   > They follow a pattern of coordinator/group
   
   We can move classes to `org.apache.kafka.tools.consumer.group` package.
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16159:
---
Labels: logging  (was: )

> Prune excessive logging from Telemetry Reporter
> ---
>
> Key: KAFKA-16159
> URL: https://issues.apache.org/jira/browse/KAFKA-16159
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, log
>Reporter: Philip Nee
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: logging
> Fix For: 3.8.0
>
>
> While running system tests locally, I've noticed excessive logging of the 
> Telemtry Reporter.  This I believe was introduced in KIP-714.
> {code:java}
> [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
> returning the value 224678 ms; the client will wait before submitting the 
> next GetTelemetrySubscriptions network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
> This is logged several times per ms - Also, given the amount of log being 
> emitted, can we also check the CPU profile to see if there's a process 
> running a tight loop?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16159:
---
Labels: consumer logging  (was: logging)

> Prune excessive logging from Telemetry Reporter
> ---
>
> Key: KAFKA-16159
> URL: https://issues.apache.org/jira/browse/KAFKA-16159
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, log
>Reporter: Philip Nee
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: consumer, logging
> Fix For: 3.8.0
>
>
> While running system tests locally, I've noticed excessive logging of the 
> Telemtry Reporter.  This I believe was introduced in KIP-714.
> {code:java}
> [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
> returning the value 224678 ms; the client will wait before submitting the 
> next GetTelemetrySubscriptions network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
> This is logged several times per ms - Also, given the amount of log being 
> emitted, can we also check the CPU profile to see if there's a process 
> running a tight loop?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16159:
---
Fix Version/s: 3.8.0

> Prune excessive logging from Telemetry Reporter
> ---
>
> Key: KAFKA-16159
> URL: https://issues.apache.org/jira/browse/KAFKA-16159
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, log
>Reporter: Philip Nee
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>
> While running system tests locally, I've noticed excessive logging of the 
> Telemtry Reporter.  This I believe was introduced in KIP-714.
> {code:java}
> [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
> returning the value 224678 ms; the client will wait before submitting the 
> next GetTelemetrySubscriptions network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
> This is logged several times per ms - Also, given the amount of log being 
> emitted, can we also check the CPU profile to see if there's a process 
> running a tight loop?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16131) Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 KRaft cluster with metadata version 3.6

2024-01-17 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-16131.
--
Resolution: Fixed

> Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 
> KRaft cluster with metadata version 3.6
> 
>
> Key: KAFKA-16131
> URL: https://issues.apache.org/jira/browse/KAFKA-16131
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Assignee: Proven Provenzano
>Priority: Blocker
> Fix For: 3.7.0
>
>
> When running Kafka 3.7.0-RC2 as a KRaft cluster with metadata version set to 
> 3.6-IV2 metadata version, it throws repeated errors like this in the 
> controller logs:
> {quote}2024-01-13 16:58:01,197 INFO [QuorumController id=0] 
> assignReplicasToDirs: event failed with UnsupportedVersionException in 15 
> microseconds. (org.apache.kafka.controller.QuorumController) 
> [quorum-controller-0-event-handler]
> 2024-01-13 16:58:01,197 ERROR [ControllerApis nodeId=0] Unexpected error 
> handling request RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, apiVersion=0, 
> clientId=1000, correlationId=14, headerVersion=2) – 
> AssignReplicasToDirsRequestData(brokerId=1000, brokerEpoch=5, 
> directories=[DirectoryData(id=w_uxN7pwQ6eXSMrOKceYIQ, 
> topics=[TopicData(topicId=bvAKLSwmR7iJoKv2yZgygQ, 
> partitions=[PartitionData(partitionIndex=2), 
> PartitionData(partitionIndex=1)]), TopicData(topicId=uNe7f5VrQgO0zST6yH1jDQ, 
> partitions=[PartitionData(partitionIndex=0)])])]) with context 
> RequestContext(header=RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, 
> apiVersion=0, clientId=1000, correlationId=14, headerVersion=2), 
> connectionId='172.16.14.219:9090-172.16.14.217:53590-7', 
> clientAddress=/[172.16.14.217|http://172.16.14.217/], 
> principal=User:CN=my-cluster-kafka,O=io.strimzi, 
> listenerName=ListenerName(CONTROLPLANE-9090), securityProtocol=SSL, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.7.0), fromPrivilegedListener=false, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@71004ad2])
>  (kafka.server.ControllerApis) [quorum-controller-0-event-handler]
> java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: Directory 
> assignment is not supported yet.
> at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
>  at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
>  at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
>  at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>  at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
>  at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:880)
>  at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:148)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:137)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>  at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>  at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: 
> Directory assignment is not supported yet.
> {quote}
>  
> With the metadata version set to 3.6-IV2, it makes sense that the request is 
> not supported. But the request should in such case not be sent at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16159:
---
Description: 
While running system tests locally, I've noticed excessive logging of the 
Telemtry Reporter.  This I believe was introduced in KIP-714.
{code:java}
[2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
returning the value 224678 ms; the client will wait before submitting the next 
GetTelemetrySubscriptions network API request 
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
This is logged several times per ms - Also, given the amount of log being 
emitted, can we also check the CPU profile to see if there's a process running 
a tight loop?

 

Update

---

Looking from the beginning, is this caused by the following?
{code:java}
DEBUG The broker generated an error for the get telemetry network API request 
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
    146 org.apache.kafka.common.errors.UnsupportedVersionException: The node 
does not support GET_TELEMETRY_SUBSCRIPTIONS {code}

  was:
While running system tests locally, I've noticed excessive logging of the 
Telemtry Reporter.  This I believe was introduced in KIP-714.
{code:java}
[2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
returning the value 224678 ms; the client will wait before submitting the next 
GetTelemetrySubscriptions network API request 
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
This is logged several times per ms - Also, given the amount of log being 
emitted, can we also check the CPU profile to see if there's a process running 
a tight loop?


> Prune excessive logging from Telemetry Reporter
> ---
>
> Key: KAFKA-16159
> URL: https://issues.apache.org/jira/browse/KAFKA-16159
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, log
>Reporter: Philip Nee
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: consumer, logging
> Fix For: 3.8.0
>
>
> While running system tests locally, I've noticed excessive logging of the 
> Telemtry Reporter.  This I believe was introduced in KIP-714.
> {code:java}
> [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
> returning the value 224678 ms; the client will wait before submitting the 
> next GetTelemetrySubscriptions network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
> This is logged several times per ms - Also, given the amount of log being 
> emitted, can we also check the CPU profile to see if there's a process 
> running a tight loop?
>  
> Update
> ---
> Looking from the beginning, is this caused by the following?
> {code:java}
> DEBUG The broker generated an error for the get telemetry network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
>     146 org.apache.kafka.common.errors.UnsupportedVersionException: The node 
> does not support GET_TELEMETRY_SUBSCRIPTIONS {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >