Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on PR #15437:
URL: https://github.com/apache/kafka/pull/15437#issuecomment-1972669980

   @philipnee Could you please act as a second pair of eyes here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1508600306


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1277,8 +1280,7 @@ void prepareShutdown(final Timer timer, final 
AtomicReference firstEx
 
 // Visible for testing
 void maybeAutoCommitSync(final boolean shouldAutoCommit,
- final Timer timer,
- final AtomicReference firstException) {

Review Comment:
   It's what the legacy consumer does.
   
   
https://github.com/apache/kafka/blob/d066b94c8103cca166d7ea01a4b5bf5f65a3b838/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1151
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


mjsax commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1508477877


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1277,8 +1280,7 @@ void prepareShutdown(final Timer timer, final 
AtomicReference firstEx
 
 // Visible for testing
 void maybeAutoCommitSync(final boolean shouldAutoCommit,
- final Timer timer,
- final AtomicReference firstException) {

Review Comment:
   > it looks like it is correct to only log the error here.
   
   Why? Should we not set `firstException` if `null` and `commitSync` throws?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15797) Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]

2024-02-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15797:
---

Assignee: Matthias J. Sax

> Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true] 
> --
>
> Key: KAFKA-15797
> URL: https://issues.apache.org/jira/browse/KAFKA-15797
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> I found two recent failures:
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldUpgradeFromEosAlphaToEosV2_true_/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldUpgradeFromEosAlphaToEosV2_true__2/]
>  
> Output generally looks like:
> {code:java}
> java.lang.AssertionError: Did not receive all 138 records from topic 
> multiPartitionOutputTopic within 6 ms, currently accumulated data is 
> [KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 
> 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), 
> KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), 
> KeyValue(0, 91), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), 
> KeyValue(0, 91), KeyValue(0, 105), KeyValue(0, 120), KeyValue(0, 136), 
> KeyValue(0, 153), KeyValue(0, 171), KeyValue(0, 190), KeyValue(3, 0), 
> KeyValue(3, 1), KeyValue(3, 3), KeyValue(3, 6), KeyValue(3, 10), KeyValue(3, 
> 15), KeyValue(3, 21), KeyValue(3, 28), KeyValue(3, 36), KeyValue(3, 45), 
> KeyValue(3, 55), KeyValue(3, 66), KeyValue(3, 78), KeyValue(3, 91), 
> KeyValue(3, 105), KeyValue(3, 120), KeyValue(3, 136), KeyValue(3, 153), 
> KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 190), KeyValue(3, 210), 
> KeyValue(3, 231), KeyValue(3, 253), KeyValue(3, 276), KeyValue(3, 300), 
> KeyValue(3, 325), KeyValue(3, 351), KeyValue(3, 378), KeyValue(3, 406), 
> KeyValue(3, 435), KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 
> 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), 
> KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), 
> KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105), KeyValue(1, 120), 
> KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), KeyValue(1, 190), 
> KeyValue(1, 120), KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), 
> KeyValue(1, 190), KeyValue(1, 210), KeyValue(1, 231), KeyValue(1, 253), 
> KeyValue(1, 276), KeyValue(1, 300), KeyValue(1, 325), KeyValue(1, 351), 
> KeyValue(1, 378), KeyValue(1, 406), KeyValue(1, 435), KeyValue(2, 0), 
> KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 6), KeyValue(2, 10), KeyValue(2, 
> 15), KeyValue(2, 21), KeyValue(2, 28), KeyValue(2, 36), KeyValue(2, 45), 
> KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), 
> KeyValue(2, 105), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), 
> KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), KeyValue(2, 136), 
> KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 210), 
> KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), 
> KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), 
> KeyValue(0, 210), KeyValue(0, 231), KeyValue(0, 253), KeyValue(0, 276), 
> KeyValue(0, 300), KeyValue(0, 325), KeyValue(0, 351), KeyValue(0, 378), 
> KeyValue(0, 406), KeyValue(0, 435)] Expected: is a value equal to or greater 
> than <138> but: <134> was less than <138>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15797: Fix flaky EOS_v2 upgrade test [kafka]

2024-02-29 Thread via GitHub


mjsax commented on PR #15449:
URL: https://github.com/apache/kafka/pull/15449#issuecomment-1972488983

   We have two runs of this test, one w/o error injection and one w/ error 
injection.
   
   The runs w/o error injections are more stable, and I did see a 
"verifyCommitted" that did see fail because of too much data (for keys/tasks 
that should not have committed), hence, I concluded that `commit.interval.ms` 
must have hit. This will be hard to verify on the test w/o error injection 
because it's pretty stable.
   
   I don't know right now, if it help for the error injection case. In fact, it 
could make it worse (I did not dig deep enough, but the test might actually 
require low TX-timeout...) -- if this is the case, we would need "hack" into 
`StreamsConfig` to allow us to disable the commit-interval/tx-timeout check so 
we can set a large commit interval and a small tx-timeout.
   
   I could not make the test fail locally, so I am hoping that a couple of 
Jenkins PR runs will give us some signal about how to move forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15797: Fix flaky EOS_v2 upgrade test [kafka]

2024-02-29 Thread via GitHub


mjsax opened a new pull request, #15449:
URL: https://github.com/apache/kafka/pull/15449

   Originally, we set commit-interval to MAX_VALUE for this test, to ensure we 
only commit expliclity. However, we needed to decrease it later on when adding 
the tx-timeout verification.
   
   We did see failing test for which commit-interval hit, resulting in failing 
test runs. This PR increase the commit-interval close to test-timeout to avoid 
commit-interval from tiggering.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: Update/errors for deprecated config [kafka]

2024-02-29 Thread via GitHub


mjsax commented on code in PR #14448:
URL: https://github.com/apache/kafka/pull/14448#discussion_r1508427425


##
.github/workflows/codesee-arch-diagram.yml:
##
@@ -0,0 +1,23 @@
+# This workflow was added by CodeSee. Learn more at https://codesee.io/

Review Comment:
   I don't think this file belongs to this PR. Seems the tool you are using 
added it automatically...



##
docs/streams/developer-guide/config-streams.html:
##
@@ -91,6 +91,7 @@
   rocksdb.config.setter
   state.dir
   topology.optimization
+  windowed.inner.class.serde

Review Comment:
   As pointed out on the other PR. I think we should not add this config to the 
docs, but rather deprecate it similar to `window.size` via the KIP you started.



##
docs/streams/developer-guide/config-streams.html:
##
@@ -300,14 +306,20 @@ num.standby.replicasnull
   
-  default.windowed.key.serde.inner
+  default.windowed.key.serde.inner 
(Deprecated. Use windowed.inner.class.serde instead.)
 Medium
-Default serializer/deserializer for the inner 
class of windowed keys, implementing the Serde interface.
+<<< HEAD

Review Comment:
   Conflict not resolve correctly.



##
docs/streams/developer-guide/config-streams.html:
##
@@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per 
partition.
 1000
   
-  cache.max.bytes.buffering
+  statestore.cache.max.bytes
+Medium
+Maximum number of memory bytes to be used for 
record caches across all threads. Note that at the debug level you can use 
cache.size to monitor the actual size of the Kafka Streams 
cache.
+10485760
+  
+  cache.max.bytes.buffering (Deprecated. Use 
cache.max.bytes instead.)

Review Comment:
   even/odd issue...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]

2024-02-29 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1508436563


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3501,42 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CLASSIC && group.isEmpty();
+}
+
+/**
+ * A group can be downgraded offline if it's a consumer group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline downgrade is valid.
+ */
+private boolean validateOfflineDowngrade(String groupId) {
+Group group = groups.get(groupId);
+return group != null && group.type() == CONSUMER && group.isEmpty();
+}
+
+/**
+ * Upgrade/Downgrade the empty group if it's valid.
+ *
+ * @param groupId The group id to be migrated.
+ * @param records The list of records to delete the previous group.
+ */
+public void maybeMigrateEmptyGroup(String groupId, List records, 
boolean isUpgrade) {
+if ((isUpgrade && validateOfflineUpgrade(groupId)) ||
+(!isUpgrade && validateOfflineDowngrade(groupId))) {
+deleteGroup(groupId, records);
+removeGroup(groupId);

Review Comment:
   There is a bug for downgrade. The sequence of downgrading now is 1) consumer 
group gets deleted 2) classicGroupJoin creates a classic group and put it into 
`groupMetadataManager.groups`3) replaying consumer group tombstone from 
`deleteGroup` where `groups.get(groupId)` should be a consumer group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-02-29 Thread via GitHub


mjsax commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1508421651


##
docs/streams/developer-guide/config-streams.html:
##
@@ -300,8 +306,10 @@ num.standby.replicasnull
   
-  default.windowed.key.serde.inner
+  default.windowed.key.serde.inner 
(Deprecated. Use windowed.inner.class.serde instead.)
 Medium
+<<< HEAD
+<<< HEAD

Review Comment:
   Seems you missed to delete some marker lines from resolving conflict during 
rebasing. (more below)



##
docs/streams/developer-guide/config-streams.html:
##
@@ -1010,6 +1016,18 @@ topology.optimization
+  windowed.inner.class.serde
+  
+
+  
+Serde for the inner class of a windowed record. Must implement the 
org.apache.kafka.common.serialization.Serde interface.
+  
+  
+Note that setting this config in KafkaStreams application would 
result in an error as it is meant to be used only from Plain consumer client.

Review Comment:
   I just did some more digging, and now I actually think that @ableegoldman is 
right, we might want to treat `windowed.inner.serde.class` similar to 
`window.size`... (ie, maybe remove from StreamsConfig -- we could add this to 
the KIP Lucia started).
   
   I also understand now, why the docs says, using it would result in an error 
(for both configs): Kafka Streams will always pass window-size and inner-serde 
via the _constructor_ and we will also verify that we don't get an parameter 
set twice (or zero time), and throw an error inside `configure()` method of the 
windowed serdes...
   
   Thus, we might want to not add `windowed.inner.serde.class` to the docs in 
this PR to begin with...
   
   Sorry for the back and forth. Reading and understanding code is hard...



##
docs/streams/developer-guide/config-streams.html:
##
@@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per 
partition.
 1000
   
-  cache.max.bytes.buffering
+  statestore.cache.max.bytes
+Medium
+Maximum number of memory bytes to be used for 
record caches across all threads. Note that at the debug level you can use 
cache.size to monitor the actual size of the Kafka Streams 
cache.

Review Comment:
   >  Note that at the debug level you can use cache.size to 
monitor the actual size of the Kafka Streams cache.
   
   What does this mean? Cannot follow.



##
docs/streams/developer-guide/config-streams.html:
##
@@ -326,6 +334,18 @@ num.standby.replicasDefault serializer/deserializer for the inner 
class of windowed keys, implementing the Serde interface. Deprecated.
+===
+Default serializer/deserializer for the inner 
class of windowed keys, implementing the Serde interface.

Review Comment:
   Duplicate line (both are not 100% the same) -- seems a conflict was not 
resolve correctly.



##
docs/streams/developer-guide/config-streams.html:
##
@@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per 
partition.
 1000
   
-  cache.max.bytes.buffering
+  statestore.cache.max.bytes
+Medium
+Maximum number of memory bytes to be used for 
record caches across all threads. Note that at the debug level you can use 
cache.size to monitor the actual size of the Kafka Streams 
cache.
+10485760
+  
+  cache.max.bytes.buffering (Deprecated. Use 
cache.max.bytes instead.)

Review Comment:
   If we insert a new row, we need to change "even / odd" for all rows below... 
super annoying... (otherwise we get two rows with the same background color 
instead of nicely interleaved rows)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.7 to Kafka Streams system tests [kafka]

2024-02-29 Thread via GitHub


mjsax commented on PR #15443:
URL: https://github.com/apache/kafka/pull/15443#issuecomment-1972305073

   Couple of system test failed, but seems with env issue. Re-running:
   
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6081/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16317: Add event process rate metric in GroupCoordinatorRuntimeMetrics [kafka]

2024-02-29 Thread via GitHub


jeffkbkim opened a new pull request, #15448:
URL: https://github.com/apache/kafka/pull/15448

   This metric will help identify how fast all coordinator threads process 
events. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16316) Make the restore behavior of GlobalKTables with custom processors configureable

2024-02-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16316:

Labels: needs-kip  (was: )

> Make the restore behavior of GlobalKTables with custom processors 
> configureable
> ---
>
> Key: KAFKA-16316
> URL: https://issues.apache.org/jira/browse/KAFKA-16316
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: needs-kip
>
> Take the change implemented in 
> https://issues.apache.org/jira/browse/KAFKA-7663 and make it optional through 
> adding a couple methods to the API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16317) Add event rate in GroupCoordinatorRuntimeMetrics

2024-02-29 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16317:


 Summary: Add event rate in GroupCoordinatorRuntimeMetrics
 Key: KAFKA-16317
 URL: https://issues.apache.org/jira/browse/KAFKA-16317
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


We want a sensor to record every time we process a new event in the coordinator 
runtime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: move TimeRatio to clients common [kafka]

2024-02-29 Thread via GitHub


jeffkbkim opened a new pull request, #15447:
URL: https://github.com/apache/kafka/pull/15447

   Move TimeRatio to `org.apache.kafka.common.metrics.stats` package
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16316) Make the restore behavior of GlobalKTables with custom processors configureable

2024-02-29 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-16316:
--

 Summary: Make the restore behavior of GlobalKTables with custom 
processors configureable
 Key: KAFKA-16316
 URL: https://issues.apache.org/jira/browse/KAFKA-16316
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Walker Carlson
Assignee: Walker Carlson


Take the change implemented in https://issues.apache.org/jira/browse/KAFKA-7663 
and make it optional through adding a couple methods to the API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-29 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506982605


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+@Override
+public void describeTopics(
+TopicCollection topics,
+DescribeTopicsOptions options,
+AdminResultsSubscriber subscriber) {
+if (topics instanceof TopicIdCollection) {
+subscriber.onError(
+new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+);
+return;
+}
+if (!(topics instanceof TopicNameCollection)) {
+subscriber.onError(
+new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+);
+return;
+}
+
+TreeSet topicNames = new TreeSet<>();
+((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+topicName + "' cannot be represented in a request.")));
+} else {
+topicNames.add(topicName);
+}
+});
+
+RecurringCall call = new RecurringCall(

Review Comment:
   I'm still not sure why we need the RecurringCall, I think something like 
this should be much less code:
   
   ```
 ArrayBlockingQueue results = new 
ArrayBlockingQueue<>(5);
 Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()), new LeastLoadedNodeProvider()) {
   DescribeTopicPartitionsRequestData.Cursor currentCursor = new 
DescribeTopicPartitionsRequestData.Cursor();
   // <...>
   @Override
   void handleResponse(AbstractResponse abstractResponse) {
 // ... Do the needful ...
 results.put(...);
 // ...
  if (hasMore) {
// ... Set new cursor ...
// ...
runnable.call(this, now);
  } else {
results.put(null);
  }
   }
   // <...>
 }
   
 runnable.call(call, time.milliseconds());
 while (true) {
   DescribeTopicPartitionsResult result = results.take();
   if (result == null)
 break;
   subscriber.onNext(result);
 }
   ```
   
   And we won't need to create extra threads on the TopicCommand and etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-29 Thread via GitHub


jeffkbkim commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1508185648


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;

Review Comment:
   and if the request is not full then we don't send a full response (i.e. omit 
the assignment)?
   
   will you be fixing it in a different PR and is there a jira to track it?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-29 Thread via GitHub


jeffkbkim commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1508179840


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1193,71 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.

Review Comment:
   Can we add a bit more description on what "reconcile" means here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-02-29 Thread via GitHub


jeffkbkim commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1508174554


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   i'm confused why we changed the name from revocation to rebalance timeout if 
it's actually the revocation timeout and a "rebalance" is considered complete 
only when we move to STABLE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 [WIP] ConfigCommand moved to tools [kafka]

2024-02-29 Thread via GitHub


nizhikov commented on PR #15417:
URL: https://github.com/apache/kafka/pull/15417#issuecomment-1971882476

   Hello @mimaison @jolshan 
   
   It seems like I done with rewriting `ConfigCommand` code to java.
   It can't be moved to `tools` right now, because of dependencies on 
`KafkaConfig` and other parts of `core`, but it passes all tests.
   
   Can you, please, help me with moving dependencies of `ConfigCommand` to the 
correct modules.
   
   PR's is here - #15075 #15387 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1508102432


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -892,6 +893,24 @@ public void testWakeupCommitted() {
 assertNull(consumer.wakeupTrigger().getPendingTask());
 }
 
+@Test
+public void testNoWakeupInCloseCommit() {

Review Comment:
   Mockito is weird sometimes! Hopefully fixed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-02-29 Thread Quoc Phong Dang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822253#comment-17822253
 ] 

Quoc Phong Dang commented on KAFKA-16190:
-

[~lianetm] I'm not sure how the extension of this test is supposed to work. 
When the consumer changes the subscription when it gets fenced, how can it keep 
the same initial one? Shouldn't it be the new one?

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Quoc Phong Dang
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support, newbie
> Fix For: 3.8.0
>
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is reset on failure scenarios, which 
> should cover the fence+rejoin sequence). 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this exact case given that it does explicitly change the subscription when it 
> gets fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1507973505


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -892,6 +893,24 @@ public void testWakeupCommitted() {
 assertNull(consumer.wakeupTrigger().getPendingTask());
 }
 
+@Test
+public void testNoWakeupInCloseCommit() {

Review Comment:
   The error I get is:
   ```java
   org.mockito.exceptions.verification.TooManyActualInvocations: 
   applicationEventHandler.add(
   
   );
   Wanted 1 time:
   -> at 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.add(ApplicationEventHandler.java:72)
   But was 2 times:
   -> at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.commit(AsyncKafkaConsumer.java:807)
   -> at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.prepareShutdown(AsyncKafkaConsumer.java:1272)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822250#comment-17822250
 ] 

Bruno Cadonna commented on KAFKA-16290:
---

I changed the version to 4.0.0.

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 4.0.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16290:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 4.0.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16315) Investigate propagating metadata updates via queues

2024-02-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16315:
-

Assignee: Kirk True

> Investigate propagating metadata updates via queues
> ---
>
> Key: KAFKA-16315
> URL: https://issues.apache.org/jira/browse/KAFKA-16315
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network 
> I/O thread then issues a call to update the {{ConsumerMetadata}} via 
> {{requestUpdate()}}. If the event ends up stuck in the queue for a long time, 
> it is possible that the metadata is not updated at the correct time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-29 Thread via GitHub


cadonna commented on PR #15426:
URL: https://github.com/apache/kafka/pull/15426#issuecomment-1971644323

   @kirktrue are you fine with merging this PR and coming back to this after 
3.8?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822246#comment-17822246
 ] 

Kirk True commented on KAFKA-16290:
---

I just filed KAFKA-16315 which has a similar feel to this design issue as we 
mutate the {{ConsumerMetadata}} in the application thread and background thread.

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16290:
--
Labels: consumer-threading-refactor kip-848 kip-848-client-support  (was: 
kip-848)

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16315) Investigate propagating metadata updates via queues

2024-02-29 Thread Kirk True (Jira)
Kirk True created KAFKA-16315:
-

 Summary: Investigate propagating metadata updates via queues
 Key: KAFKA-16315
 URL: https://issues.apache.org/jira/browse/KAFKA-16315
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
 Fix For: 4.0.0


Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network 
I/O thread then issues a call to update the {{ConsumerMetadata}} via 
{{requestUpdate()}}. If the event ends up stuck in the queue for a long time, 
it is possible that the metadata is not updated at the correct time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822245#comment-17822245
 ] 

Kirk True commented on KAFKA-16290:
---

[~cadonna]—do we feel we have investigated this sufficiently? Are we making any 
changes in 3.8.0, or should should we move this to 4.0.0 or just remove the 
version altogether? Thanks!

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Kirk True
>Priority: Critical
>  Labels: kip-848
> Fix For: 3.8.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16290:
-

Assignee: Bruno Cadonna  (was: Kirk True)

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: kip-848
> Fix For: 3.8.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]

2024-02-29 Thread via GitHub


jeffkbkim commented on code in PR #15446:
URL: https://github.com/apache/kafka/pull/15446#discussion_r1507948503


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1830,6 +1832,64 @@ public void onLoaded() {
 });
 }
 
+// TODO: this part is done asynchronously in the old coordinator via a 
KafkaScheduler#schedule
+/**

Review Comment:
   would like to point out that the existing coordinator does this 
asynchronously



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]

2024-02-29 Thread via GitHub


jeffkbkim opened a new pull request, #15446:
URL: https://github.com/apache/kafka/pull/15446

   In the new coordinator, we have lingering timers (heartbeats/revocation 
timeouts/join groups/sync groups). For classic groups, we also have awaiting 
join/sync futures that are never completed. This patch cancels all existing 
timers and completes all awaiting futures when a group is unloaded.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]

2024-02-29 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1971535782

   @mimaison any updates?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1507865068


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -97,6 +101,10 @@ public void setFetchAction(final FetchBuffer fetchBuffer) {
 }
 }
 
+public void disableWakeups() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1507852098


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -892,6 +893,24 @@ public void testWakeupCommitted() {
 assertNull(consumer.wakeupTrigger().getPendingTask());
 }
 
+@Test
+public void testNoWakeupInCloseCommit() {

Review Comment:
   Could you provide some more details? It passed for me 5000 times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1507827920


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -97,6 +101,10 @@ public void setFetchAction(final FetchBuffer fetchBuffer) {
 }
 }
 
+public void disableWakeups() {

Review Comment:
   Could you please add unit tests for this new method?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -892,6 +893,24 @@ public void testWakeupCommitted() {
 assertNull(consumer.wakeupTrigger().getPendingTask());
 }
 
+@Test
+public void testNoWakeupInCloseCommit() {

Review Comment:
   This unit test does not work for me locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]

2024-02-29 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1507837970


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -474,6 +474,8 @@ public CoordinatorResult 
commitOffset(
 final List records = new ArrayList<>();
 final long currentTimeMs = time.milliseconds();
 final OptionalLong expireTimestampMs = 
expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
+groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, 
true);
+final int initialRecordsSize = records.size();

Review Comment:
   My question was more about the `maybeUpgradeEmptyGroup` called here. Why do 
we need it? This seems to be orthogonal to the offset metadata manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]

2024-02-29 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1507826893


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+
+if (group == null || group.type() == CONSUMER) {
+return false;
+}
+
+ClassicGroup classicGroup = (ClassicGroup) group;
+if (!classicGroup.isEmpty()) {
+return false;
+} else {
+return true;
+}
+}
+
+/**
+ * Upgrade the empty classic group to a consumer group if it's valid.
+ *
+ * @param groupId   The group id to be updated.
+ * @param records   The list of records to delete the classic group 
and create the consumer group.
+ * @param isSimpleGroup The boolean indicating whether the group to be 
updated is a simple group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records, 
boolean isSimpleGroup) {

Review Comment:
   I agree with it in the case of upgrading if a new member joining the empty 
group, but for the simple group we still need to create the new group. Is that 
correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]

2024-02-29 Thread via GitHub


dongnuo123 commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1507823457


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -474,6 +474,8 @@ public CoordinatorResult 
commitOffset(
 final List records = new ArrayList<>();
 final long currentTimeMs = time.milliseconds();
 final OptionalLong expireTimestampMs = 
expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
+groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, 
true);
+final int initialRecordsSize = records.size();

Review Comment:
   This is because there is a `if (!records.isEmpty())` in L513. If any new 
records are added we need ` if (records.size() > initialRecordsSize)` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15445:
URL: https://github.com/apache/kafka/pull/15445#discussion_r1507815350


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1277,8 +1280,7 @@ void prepareShutdown(final Timer timer, final 
AtomicReference firstEx
 
 // Visible for testing
 void maybeAutoCommitSync(final boolean shouldAutoCommit,
- final Timer timer,
- final AtomicReference firstException) {

Review Comment:
   this parameter wasn't used, and it looks like it is correct to only log the 
error here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on PR #15445:
URL: https://github.com/apache/kafka/pull/15445#issuecomment-1971423281

   @cadonna Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]

2024-02-29 Thread via GitHub


lucasbru opened a new pull request, #15445:
URL: https://github.com/apache/kafka/pull/15445

   When the consumer is closed, we perform a sychronous autocommit. We don't 
want to be woken up here, because we are already executing a close operation 
under a deadline. This is in line with the behavior of the old consumer.
   
   This fixes PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup which is 
flaky on trunk - because we return immediately from the synchronous commit with 
a WakeupException, which causes us to not wait for the commit to finish and 
thereby sometimes miss the committed offset when a new consumer is created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only

2024-02-29 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822181#comment-17822181
 ] 

zhangzhisheng commented on KAFKA-3370:
--

https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms

> Add options to auto.offset.reset to reset offsets upon initialization only
> --
>
> Key: KAFKA-3370
> URL: https://issues.apache.org/jira/browse/KAFKA-3370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
>
> Currently "auto.offset.reset" is applied in the following two cases:
> 1) upon starting the consumer for the first time (hence no committed offsets 
> before);
> 2) upon fetching offsets out-of-range.
> For scenarios where case 2) needs to be avoid (i.e. people need to be 
> notified upon offsets out-of-range rather than silently offset reset), 
> "auto.offset.reset" need to be set to "none". However for case 1) setting 
> "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon 
> polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied 
> trying to set the offset at initialization, which are actually designed for 
> during the life time of the consumer (in rebalance callback, for example).
> The fix proposal is to add two more options to "auto.offset.reset", 
> "earliest-on-start", and "latest-on-start", whose semantics are "earliest" 
> and "latest" for case 1) only, and "none" for case 2).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]

2024-02-29 Thread via GitHub


AyoubOm commented on code in PR #15361:
URL: https://github.com/apache/kafka/pull/15361#discussion_r1507760037


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1664,18 +1664,17 @@ public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
 
 
EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition,
 offset)).anyTimes();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
+EasyMock.expectLastCall().times(2);
 EasyMock.expect(stateManager.changelogOffsets())
 .andReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
 .andReturn(singletonMap(changelogPartition, 10L))
 .andReturn(singletonMap(changelogPartition, 20L));
-EasyMock.expectLastCall();
 EasyMock.replay(stateManager, recordCollector);
 
 task = createStatefulTask(createConfig("100"), true);
 
 task.initializeIfNeeded();
-task.completeRestoration(noOpResetter -> { });
+task.completeRestoration(noOpResetter -> { }); // should checkpoint

Review Comment:
   Done. Maybe I will create a minor separate PR for that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]

2024-02-29 Thread via GitHub


dajac merged PR #15432:
URL: https://github.com/apache/kafka/pull/15432


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15440:
URL: https://github.com/apache/kafka/pull/15440#discussion_r1507649930


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
 
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
 }
 
+@Test
+public void testLeaveGroupWhenAssignmentEmpty() {

Review Comment:
   Replaced it by a simpler test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15440:
URL: https://github.com/apache/kafka/pull/15440#discussion_r1507649418


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
 
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
 }
 
+@Test
+public void testLeaveGroupWhenAssignmentEmpty() {
+String topicName = "topic1";
+TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+MembershipManager membershipManager = createMemberInStableState();
+
+// We own a partition and rebalance listener is registered
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(mock(ConsumerRebalanceListener.class)));
+doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+// Trigger leave group
+CompletableFuture leaveResult1 = membershipManager.leaveGroup();
+
+// Rebalance listener not yet triggered
+final ArgumentCaptor 
consumerRebalanceListener =
+
ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackNeededEvent.class);
+
verify(backgroundEventHandler).add(consumerRebalanceListener.capture());
+final ConsumerRebalanceListenerCallbackNeededEvent callbackEvent = 
consumerRebalanceListener.getValue();
+assertFalse(leaveResult1.isDone());
+assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
+
+// Clear the assignment
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false);
+
+// Complete the callback
+callbackEvent.future().complete(null);
+
+// Completed the listener

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15361:
URL: https://github.com/apache/kafka/pull/15361#discussion_r1507618245


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1664,18 +1664,17 @@ public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
 
 
EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition,
 offset)).anyTimes();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
+EasyMock.expectLastCall().times(2);
 EasyMock.expect(stateManager.changelogOffsets())
 .andReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
 .andReturn(singletonMap(changelogPartition, 10L))
 .andReturn(singletonMap(changelogPartition, 20L));
-EasyMock.expectLastCall();
 EasyMock.replay(stateManager, recordCollector);
 
 task = createStatefulTask(createConfig("100"), true);
 
 task.initializeIfNeeded();
-task.completeRestoration(noOpResetter -> { });
+task.completeRestoration(noOpResetter -> { }); // should checkpoint

Review Comment:
   These changes do not seem necessary. Could you please revert them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15440:
URL: https://github.com/apache/kafka/pull/15440#discussion_r1507610459


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
 
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
 }
 
+@Test
+public void testLeaveGroupWhenAssignmentEmpty() {
+String topicName = "topic1";
+TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+MembershipManager membershipManager = createMemberInStableState();
+
+// We own a partition and rebalance listener is registered
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(mock(ConsumerRebalanceListener.class)));
+doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+// Trigger leave group
+CompletableFuture leaveResult1 = membershipManager.leaveGroup();
+
+// Rebalance listener not yet triggered
+final ArgumentCaptor 
consumerRebalanceListener =
+
ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackNeededEvent.class);
+
verify(backgroundEventHandler).add(consumerRebalanceListener.capture());
+final ConsumerRebalanceListenerCallbackNeededEvent callbackEvent = 
consumerRebalanceListener.getValue();
+assertFalse(leaveResult1.isDone());
+assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
+
+// Clear the assignment
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false);
+
+// Complete the callback
+callbackEvent.future().complete(null);
+
+// Completed the listener

Review Comment:
   This comment does not make too much sense since the leave future is still 
verified to be not completed. 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
 
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
 }
 
+@Test
+public void testLeaveGroupWhenAssignmentEmpty() {

Review Comment:
   Is there maybe a more direct way to test the bug?
   I had a hard time to understand this unit test.
   Nevermind if there is no more direct way. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16314) Add the new ABORTABLE_ERROR

2024-02-29 Thread zhangzhisheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzhisheng updated KAFKA-16314:
--
Attachment: image-2024-02-29-21-43-51-000.png

> Add the new ABORTABLE_ERROR
> ---
>
> Key: KAFKA-16314
> URL: https://issues.apache.org/jira/browse/KAFKA-16314
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sanskar Jhajharia
>Assignee: Sanskar Jhajharia
>Priority: Major
> Attachments: image-2024-02-29-21-43-51-000.png
>
>
> As mentioned in the KIP, we would bump the ProduceRequest and ProduceResponse 
> to indicate that the server now returns a new ABORTABLE_ERROR. This error 
> would essentially require the client to abort the current transaction and 
> continue (without a need to restart the client).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16314) Add the new ABORTABLE_ERROR

2024-02-29 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822138#comment-17822138
 ] 

zhangzhisheng commented on KAFKA-16314:
---

I think this ability is needed

> Add the new ABORTABLE_ERROR
> ---
>
> Key: KAFKA-16314
> URL: https://issues.apache.org/jira/browse/KAFKA-16314
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sanskar Jhajharia
>Assignee: Sanskar Jhajharia
>Priority: Major
> Attachments: image-2024-02-29-21-43-51-000.png
>
>
> As mentioned in the KIP, we would bump the ProduceRequest and ProduceResponse 
> to indicate that the server now returns a new ABORTABLE_ERROR. This error 
> would essentially require the client to abort the current transaction and 
> continue (without a need to restart the client).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest

2024-02-29 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822135#comment-17822135
 ] 

zhangzhisheng commented on KAFKA-12261:
---

It is like this, but in my understanding, this is normal.

> Splitting partition causes message loss for consumers with 
> auto.offset.reset=latest
> ---
>
> Key: KAFKA-12261
> URL: https://issues.apache.org/jira/browse/KAFKA-12261
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
> Fix For: 3.6.0
>
>
> As of now, auto.offset.reset of ConsumerConfig is "latest" by default.
>  
> This could be a pitfall that causes message delivery loss when we split 
> topic's partitions like below:
> Say we have a topic-X which have only 1 partition.
>  # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X 
> --partitions 2 (topic-X-1 is added)
>  # producer knows that new partitions are added by refreshing metadata. 
> starts to produce to topic-X-1
>  # bit later, consumer knows that new partitions are added and triggering 
> consumer rebalance, then starts consuming topic-X-1
>  * 
>  ** upon starting consumption, it resets its offset to log-end-offset
> If the producer sent several records before 3, they could be not-delivered to 
> the consumer.
>  
>  
> This behavior isn't preferable in most cases, so it should be documented in 
> AUTO_OFFSET_RESET_DOC at least.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-02-29 Thread via GitHub


chiacyu commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1971140347

   Hi, @showuon 
   Please help to review this PR.
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-02-29 Thread via GitHub


chiacyu opened a new pull request, #15444:
URL: https://github.com/apache/kafka/pull/15444

   Hi, all
   Change the function with a better way to deal with the NULL pointer 
exception.
   Thanks all!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-02-29 Thread via GitHub


gaurav-narula commented on code in PR #15289:
URL: https://github.com/apache/kafka/pull/15289#discussion_r1507510343


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -137,7 +137,18 @@ object TestUtils extends Logging {
 val parentFile = new File(parent)
 parentFile.mkdirs()
 
-JTestUtils.tempDirectory(parentFile.toPath, null)
+val result = JTestUtils.tempDirectory(parentFile.toPath, null)
+
+parentFile.deleteOnExit()
+Exit.addShutdownHook("delete-temp-log-dir-on-exit", {
+  try {
+Utils.delete(parentFile)

Review Comment:
   Good point - turns we can just use `parentFile.deleteOnExit()`. Updated in 
[394f2e3](https://github.com/apache/kafka/pull/15289/commits/394f2e30e3ac2110921f8e1ea2978c405194a11b)



##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -137,7 +137,18 @@ object TestUtils extends Logging {
 val parentFile = new File(parent)
 parentFile.mkdirs()
 
-JTestUtils.tempDirectory(parentFile.toPath, null)
+val result = JTestUtils.tempDirectory(parentFile.toPath, null)
+
+parentFile.deleteOnExit()
+Exit.addShutdownHook("delete-temp-log-dir-on-exit", {
+  try {
+Utils.delete(parentFile)

Review Comment:
   Good point - turns out we can just use `parentFile.deleteOnExit()`. Updated 
in 
[394f2e3](https://github.com/apache/kafka/pull/15289/commits/394f2e30e3ac2110921f8e1ea2978c405194a11b)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]

2024-02-29 Thread via GitHub


AyoubOm commented on PR #15361:
URL: https://github.com/apache/kafka/pull/15361#issuecomment-1971038693

   > @AyoubOm Thanks for the updates!
   > 
   > Here my replies.
   
   Thanks @cadonna, I made updates accordingly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-29 Thread via GitHub


nizhikov commented on PR #15365:
URL: https://github.com/apache/kafka/pull/15365#issuecomment-1971031598

   Hello, @rreddy-22 @dajac
   
   Can you, please, take a look at these changes?
   It PR moves test of `ConsoleGroupCommand` from Authorizer test and Sasl test 
to `tools`.
   It very small (+200, -100).
   
   Big PR where all command code rewritten in java - 
https://github.com/apache/kafka/pull/14471
   
   Right now, changes in ConsoleGroupCommand leads to conflicts in my PR which 
is hard to resolve.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [2/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-29 Thread via GitHub


nizhikov commented on PR #15363:
URL: https://github.com/apache/kafka/pull/15363#issuecomment-1971028532

   Hello, @rreddy-22 @dajac 
   
   Can you, please, take a look at these changes?
   It PR moves `DescribeConsumerGroupTest` and `ResetConsumerGroupOffsetTest` 
as part of moving `ConsoleGroupCommand` to `tools`.
   
   Having it in trunk will reduce your work while improving 
`ConsoleGroupCommand` (no need to duplicate test changes in java and scala) and 
help moving command to `tools`.
   
   Big PR where all command code rewritten in java - #14471 
   
   Right now, changes in `ConsoleGroupCommand` leads to conflicts in my PR 
which is hard to resolve.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16190: Member should send full heartbeat when rejoining [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15401:
URL: https://github.com/apache/kafka/pull/15401#discussion_r1507409271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -524,10 +524,10 @@ public void testHeartbeatState() {
 assertEquals(memberId, data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
-assertEquals(-1, data.rebalanceTimeoutMs());

Review Comment:
   The ticket asks for an extension of this test: 
   
   > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState 
misses this exact case given that it does explicitly change the subscription 
when it gets fenced. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16190: Member should send full heartbeat when rejoining [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15401:
URL: https://github.com/apache/kafka/pull/15401#discussion_r1507409979


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -545,6 +545,15 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
+// Sent all fields if the member is joining/rejoining the group

Review Comment:
   nit: "Send"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]

2024-02-29 Thread via GitHub


cadonna commented on PR #15440:
URL: https://github.com/apache/kafka/pull/15440#issuecomment-1970897046

   @lucasbru Unit tests are failing in `MembershipManagerImplTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on code in PR #15330:
URL: https://github.com/apache/kafka/pull/15330#discussion_r1507381845


##
tests/kafkatest/tests/verifiable_consumer_test.py:
##
@@ -72,16 +86,59 @@ def await_produced_messages(self, producer, 
min_messages=1000, timeout_sec=10):
err_msg="Timeout awaiting messages to be produced and 
acked")
 
 def await_consumed_messages(self, consumer, min_messages=1):
+timeout_sec = self.consumption_timeout_sec
 current_total = consumer.total_consumed()
-wait_until(lambda: consumer.total_consumed() >= current_total + 
min_messages,
-   timeout_sec=self.consumption_timeout_sec,
-   err_msg="Timed out waiting for consumption")
+expected = current_total + min_messages
+
+def _condition():
+return consumer.total_consumed() >= expected
+
+def _err_msg():
+actual = consumer.total_consumed()
+return "%d messages received within the timeout of %d seconds, 
expected %d" % (actual, timeout_sec, expected)
+
+wait_until(lambda: _condition(), timeout_sec=timeout_sec, 
err_msg=_err_msg())
+
+def await_members_stopped(self, consumer, num_consumers, timeout_sec):
+self._await_members_in_state(consumer, num_consumers, "stopped", 
[ConsumerState.Dead], timeout_sec)
 
 def await_members(self, consumer, num_consumers):
 # Wait until all members have joined the group
-wait_until(lambda: len(consumer.joined_nodes()) == num_consumers,
-   timeout_sec=self.session_timeout_sec*2,
-   err_msg="Consumers failed to join in a reasonable amount of 
time")
-
+states = [ConsumerState.Joined]
+
+if 
consumer_group.is_consumer_group_protocol_enabled(consumer.group_protocol):
+states.extend([ConsumerState.Started, ConsumerState.Rebalancing])

Review Comment:
   Why do we have to do this? Are we still checking the same thing if we 
consider "started" as "joined"?



##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -201,7 +200,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quor
 bounce_mode=["all", "rolling"],
 num_bounces=[5],
 metadata_quorum=[quorum.isolated_kraft],
-use_new_coordinator=[True, False]
+use_new_coordinator=[True]

Review Comment:
   Why no group protocol parameter here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna merged PR #15438:
URL: https://github.com/apache/kafka/pull/15438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna commented on PR #15438:
URL: https://github.com/apache/kafka/pull/15438#issuecomment-1970829159

   The test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15438:
URL: https://github.com/apache/kafka/pull/15438#discussion_r1507345366


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -32,36 +36,42 @@ public enum Type {
 
 private final Type type;
 
+/**
+ * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+private final Uuid id;

Review Comment:
   In any case, we can add the counter or sequence number later. It does not 
have to block this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15438:
URL: https://github.com/apache/kafka/pull/15438#discussion_r1507320914


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -32,36 +36,42 @@ public enum Type {
 
 private final Type type;
 
+/**
+ * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+private final Uuid id;
+
 protected ApplicationEvent(Type type) {
 this.type = Objects.requireNonNull(type);
+this.id = Uuid.randomUuid();
 }
 
 public Type type() {
 return type;
 }
 
-@Override
-public boolean equals(Object o) {
-if (this == o) return true;
-if (o == null || getClass() != o.getClass()) return false;
-
-ApplicationEvent that = (ApplicationEvent) o;
+public Uuid id() {
+return id;
+}
 
-return type == that.type;
+@Override
+public final boolean equals(Object o) {
+return this == o;
 }
 
 @Override
-public int hashCode() {
-return type.hashCode();
+public final int hashCode() {
+return Objects.hash(type, id);
 }
 
 protected String toStringBase() {
-return "type=" + type;
+return "type=" + type + ", id=" + id;

Review Comment:
   I was more on the performance reason side, but I also assumed that the 
performance benefits are negligible. I proposed it more to be on the safe side, 
thus the "nit" prefix. I am fine either way. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15438:
URL: https://github.com/apache/kafka/pull/15438#discussion_r1507320914


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -32,36 +36,42 @@ public enum Type {
 
 private final Type type;
 
+/**
+ * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+private final Uuid id;
+
 protected ApplicationEvent(Type type) {
 this.type = Objects.requireNonNull(type);
+this.id = Uuid.randomUuid();
 }
 
 public Type type() {
 return type;
 }
 
-@Override
-public boolean equals(Object o) {
-if (this == o) return true;
-if (o == null || getClass() != o.getClass()) return false;
-
-ApplicationEvent that = (ApplicationEvent) o;
+public Uuid id() {
+return id;
+}
 
-return type == that.type;
+@Override
+public final boolean equals(Object o) {
+return this == o;
 }
 
 @Override
-public int hashCode() {
-return type.hashCode();
+public final int hashCode() {
+return Objects.hash(type, id);
 }
 
 protected String toStringBase() {
-return "type=" + type;
+return "type=" + type + ", id=" + id;

Review Comment:
   I was more on the performance reason side, but I also assumed that the 
performance benefits are negligible. I proposed it more to be on the safe side, 
thus the "nit" prefix. I am fine either way. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15438:
URL: https://github.com/apache/kafka/pull/15438#discussion_r1507316466


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -32,36 +36,42 @@ public enum Type {
 
 private final Type type;
 
+/**
+ * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+private final Uuid id;

Review Comment:
   > I do like the idea of being able to determine the ordering of the events, 
though.
   
   Can we maybe have both -- globally unique IDs and a counter? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]

2024-02-29 Thread via GitHub


cadonna commented on code in PR #15438:
URL: https://github.com/apache/kafka/pull/15438#discussion_r1507315536


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -32,36 +36,42 @@ public enum Type {
 
 private final Type type;
 
+/**
+ * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+private final Uuid id;

Review Comment:
   > I wanted the IDs to be "globally unique", which isn't going to happen if 
the client logs include multiple consumers. Perhaps that's an over-optimization?
   
   I hope the log context will tell you from what consumer the events come 
from. However, I also agree that searching for globally unique IDs in logs is 
easier than searching for counters. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]

2024-02-29 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1507296782


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+
+if (group == null || group.type() == CONSUMER) {
+return false;
+}
+
+ClassicGroup classicGroup = (ClassicGroup) group;
+if (!classicGroup.isEmpty()) {
+return false;
+} else {
+return true;
+}
+}
+
+/**
+ * Upgrade the empty classic group to a consumer group if it's valid.
+ *
+ * @param groupId   The group id to be updated.
+ * @param records   The list of records to delete the classic group 
and create the consumer group.
+ * @param isSimpleGroup The boolean indicating whether the group to be 
updated is a simple group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records, 
boolean isSimpleGroup) {

Review Comment:
   In this particular case where the group is empty and a new member joins, my 
understanding is that we only need to delete the previous group. The new group 
will be automatically created with the new member. Am I missing something?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+
+if (group == null || group.type() == CONSUMER) {
+return false;
+}
+
+ClassicGroup classicGroup = (ClassicGroup) group;
+if (!classicGroup.isEmpty()) {
+return false;
+} else {
+return true;
+}
+}
+
+/**
+ * Upgrade the empty classic group to a consumer group if it's valid.
+ *
+ * @param groupId   The group id to be updated.
+ * @param records   The list of records to delete the classic group 
and create the consumer group.
+ * @param isSimpleGroup The boolean indicating whether the group to be 
updated is a simple group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records, 
boolean isSimpleGroup) {
+if (validateOfflineUpgrade(groupId)) {
+final long currentTimeMs = time.milliseconds();
+ClassicGroup classicGroup = getOrMaybeCreateClassicGroup(groupId, 
false);
+int groupEpoch = classicGroup.generationId();
+
+// Replace the classic group with a new consumer group.
+ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, true);
+// We don't create the tombstone because the replay will remove 
the newly created consumer group.

Review Comment:
   I think that we must write a tombstone for the old group. As you said, the 
map will be updated based on the records with the new group. However, we also 
need to compact the old record for the group and the only way to do it is to 
write a tombstone. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]

2024-02-29 Thread via GitHub


dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1507302228


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -474,6 +474,8 @@ public CoordinatorResult 
commitOffset(
 final List records = new ArrayList<>();
 final long currentTimeMs = time.milliseconds();
 final OptionalLong expireTimestampMs = 
expireTimestampMs(request.retentionTimeMs(), currentTimeMs);
+groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, 
true);
+final int initialRecordsSize = records.size();

Review Comment:
   Why are we doing this here? I am not sure to follow.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+/**
+ * A group can be upgraded offline if it's a classic group and empty.
+ *
+ * @param groupId The group to be validated.
+ * @return true if the offline upgrade is valid.
+ */
+private boolean validateOfflineUpgrade(String groupId) {
+Group group = groups.get(groupId);
+
+if (group == null || group.type() == CONSUMER) {
+return false;
+}
+
+ClassicGroup classicGroup = (ClassicGroup) group;
+if (!classicGroup.isEmpty()) {
+return false;
+} else {
+return true;
+}
+}
+
+/**
+ * Upgrade the empty classic group to a consumer group if it's valid.
+ *
+ * @param groupId   The group id to be updated.
+ * @param records   The list of records to delete the classic group 
and create the consumer group.
+ * @param isSimpleGroup The boolean indicating whether the group to be 
updated is a simple group.
+ */
+public void maybeUpgradeEmptyGroup(String groupId, List records, 
boolean isSimpleGroup) {
+if (validateOfflineUpgrade(groupId)) {
+final long currentTimeMs = time.milliseconds();
+ClassicGroup classicGroup = getOrMaybeCreateClassicGroup(groupId, 
false);
+int groupEpoch = classicGroup.generationId();
+
+// Replace the classic group with a new consumer group.
+ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, true);
+// We don't create the tombstone because the replay will remove 
the newly created consumer group.
+removeGroup(groupId);
+groups.put(groupId, consumerGroup);
+metrics.onConsumerGroupStateTransition(null, 
consumerGroup.state());
+
+if (!isSimpleGroup) {
+records.add(newGroupSubscriptionMetadataRecord(
+groupId,
+
consumerGroup.computeSubscriptionMetadata(classicGroup.subscribedTopics(), 
metadataImage.topics(), metadataImage.cluster())
+));

Review Comment:
   This does not seem necessary. If the group is empty, the subscribedTopics 
will also be empty. As I said earlier, we can let the handling of the new 
member create the new subscription.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]

2024-02-29 Thread via GitHub


dajac commented on code in PR #15432:
URL: https://github.com/apache/kafka/pull/15432#discussion_r1507247226


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1695,8 +1683,8 @@ public void scheduleUnloadOperation(
 scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + 
partitionEpoch + ")", tp, () -> {
 CoordinatorContext context = coordinators.get(tp);
 if (context != null) {
+context.lock.lock();

Review Comment:
   I moved it for consistency but it also seems better to lock before entering 
the try..catch because unlocking if lock fails would be wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]

2024-02-29 Thread via GitHub


dajac commented on code in PR #15432:
URL: https://github.com/apache/kafka/pull/15432#discussion_r1507246125


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1731,15 +1735,19 @@ public void onNewMetadataImage(
 // Push an event for each coordinator.
 coordinators.keySet().forEach(tp -> {
 scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + 
newImage.offset() + ")", tp, () -> {
-withContextOrThrow(tp, context -> {
-if (context.state == CoordinatorState.ACTIVE) {
+CoordinatorContext context = coordinators.get(tp);
+if (context != null && context.state == 
CoordinatorState.ACTIVE) {

Review Comment:
   Oh. Nice catch. This is a bug. Let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15462) Add group type filter to the admin client

2024-02-29 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15462.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Add group type filter to the admin client
> -
>
> Key: KAFKA-15462
> URL: https://issues.apache.org/jira/browse/KAFKA-15462
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Ritika Reddy
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-29 Thread via GitHub


dajac merged PR #15150:
URL: https://github.com/apache/kafka/pull/15150


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org