[GitHub] [kafka] abhijeetk88 opened a new pull request, #14127: KAFKA-15181: Wait for RemoteLogMetadataCahce to initialize after assi…

2023-07-28 Thread via GitHub


abhijeetk88 opened a new pull request, #14127:
URL: https://github.com/apache/kafka/pull/14127

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-07-28 Thread via GitHub


satishd commented on code in PR #14114:
URL: https://github.com/apache/kafka/pull/14114#discussion_r1278245511


##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -62,13 +63,17 @@ class LogConfigTest {
 kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2")
 kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2")
 kafkaProps.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0")
+kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
"30")

Review Comment:
   As these values are long types. Can we have a value more than Integer.MAX?



##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -713,6 +714,99 @@ class DynamicBrokerConfigTest {
 config.updateCurrentConfig(new KafkaConfig(props))
 
assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp))
   }
+
+  @Test
+  def testDynamicLogLocalRetentionMsConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+props.put(KafkaConfig.LogRetentionTimeMillisProp, "1000")

Review Comment:
   As these values are long types. Can we have a value more than Integer.MAX? 
Same comment in this class for other retention properties.



##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -62,13 +63,17 @@ class LogConfigTest {
 kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2")
 kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2")
 kafkaProps.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0")
+kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, 
"30")
+kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, 
"1024")

Review Comment:
   As these values are long types. Can we have a value more than Integer.MAX?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2023-07-28 Thread via GitHub


github-actions[bot] commented on PR #12781:
URL: https://github.com/apache/kafka/pull/12781#issuecomment-1656539350

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.

2023-07-28 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748778#comment-17748778
 ] 

Chris Egerton edited comment on KAFKA-15252 at 7/29/23 3:12 AM:


I think this is caused by KAFKA-15090. [~nikita.krasnov] do you think we can 
close this as a duplicate, or is it a different issue?


was (Author: chrisegerton):
I think this is caused by KAFKA-15090. [~nikita.krasnov] do you think we can 
close this as a duplicate, or is this a different issue?

> Task is not stopped until the poll interval passes in case of task restarting.
> --
>
> Key: KAFKA-15252
> URL: https://issues.apache.org/jira/browse/KAFKA-15252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nikita
>Priority: Major
>
> We face a problem with restarting of the tasks, sometimes it leads to 
> resource leak. 
> We used the jdbc source connector and noticed an increasing of count of 
> opened sessions on Vertica side. But this problem is applicable for all 
> databases and possibly for all source connectors.
> Our case is the next: 
> 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) 
> and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's 
> the property on Kafka-connect side, we set 1)
> 2) Send POST /connectors//tasks//restart
> ER: count of session is the same as before restart
> AR: count of session increases
> The main problem is when 
> org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection)
>   method is called it doesn't stop a source task itself. 
> The source task stops only if polling process stops on source task side. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.

2023-07-28 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748778#comment-17748778
 ] 

Chris Egerton commented on KAFKA-15252:
---

I think this is caused by KAFKA-15090. [~nikita.krasnov] do you think we can 
close this as a duplicate, or is this a different issue?

> Task is not stopped until the poll interval passes in case of task restarting.
> --
>
> Key: KAFKA-15252
> URL: https://issues.apache.org/jira/browse/KAFKA-15252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nikita
>Priority: Major
>
> We face a problem with restarting of the tasks, sometimes it leads to 
> resource leak. 
> We used the jdbc source connector and noticed an increasing of count of 
> opened sessions on Vertica side. But this problem is applicable for all 
> databases and possibly for all source connectors.
> Our case is the next: 
> 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) 
> and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's 
> the property on Kafka-connect side, we set 1)
> 2) Send POST /connectors//tasks//restart
> ER: count of session is the same as before restart
> AR: count of session increases
> The main problem is when 
> org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection)
>   method is called it doesn't stop a source task itself. 
> The source task stops only if polling process stops on source task side. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #14101: Source task stop call was added to force stopping execution.

2023-07-28 Thread via GitHub


C0urante commented on PR #14101:
URL: https://github.com/apache/kafka/pull/14101#issuecomment-165657

   There's also 
[KAFKA-15090](https://issues.apache.org/jira/browse/KAFKA-15090), which gives 
some context on why we stopped invoking `SourceTask::stop` on the herder thread 
(which is the thread that would be invoking that method in this PR). I've been 
exploring some potential fixes, will publish a PR next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on pull request #14109: [MINOR] Addressing NPE when broker's initialCatchUpFuture fails

2023-07-28 Thread via GitHub


ahuang98 commented on PR #14109:
URL: https://github.com/apache/kafka/pull/14109#issuecomment-1656471910

   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14109/2/pipeline/12
 looks pretty good but I'll retrigger another build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-07-28 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1278072179


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1488,9 +1500,10 @@ public void run() {
 } else if (!heartbeat.shouldHeartbeat(now)) {
 // poll again after waiting for the retry backoff 
in case the heartbeat failed or the
 // coordinator disconnected
-
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
+
AbstractCoordinator.this.wait(retryBackoff.backoff(attempts++));
 } else {
 heartbeat.sentHeartbeat(now);
+attempts = 0L;

Review Comment:
   I think the common case where exponential backoff could be helpful is that 
during a heartbeat failure, the coordinator has changed, but it takes some time 
to discover the coordinator. The current code will do that following in a loop 
in that case.
   
   ```
   sendHeartbeat
   get NotCoordinator error 
   findCoordinator
   wait for retryBackoff
   ```
   With the new change, since attempts is reset on every Heartbeat request. We 
will do the same loop as the above with no exponential backoff in between.
   



##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -203,18 +224,43 @@ public class CommonClientConfigs {
  * @return  The new values which have been set as 
described in postProcessParsedConfig.
  */
 public static Map 
postProcessReconnectBackoffConfigs(AbstractConfig config,
-Map 
parsedValues) {
+ 
Map parsedValues) {
 HashMap rval = new HashMap<>();
 Map originalConfig = config.originals();
 if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
 originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
-log.debug("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
+log.warn("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
 RECONNECT_BACKOFF_MS_CONFIG, 
RECONNECT_BACKOFF_MAX_MS_CONFIG);
 rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, 
parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
 }
 return rval;
 }
 
+/**
+ * Log warning if the exponential backoff is disabled due to initial 
backoff value is greater than max backoff value.
+ *
+ * @param configThe config object.
+ */
+public static void warnDisablingExponentialBackoff(AbstractConfig config) {
+long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG);
+if (retryBackoffMs > retryBackoffMaxMs) {
+log.warn("Configuration '{}' with value '{}' is greater than 
configuration '{}' with value '{}'. " +
+"A static backoff with value '{}' will be applied.",
+RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
+RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, 
retryBackoffMs);

Review Comment:
   Should the last param be `retryBackoffMaxMs`? Ditto for 
`connectionSetupTimeoutMs` below.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -278,11 +291,13 @@ private void validatePositionsAsync(Map partition
 offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
 
 future.addListener(new 
RequestFutureListener() {
+private long attempts = 0L;
 @Override
 public void onSuccess(OffsetForEpochResult offsetsResult) {
 List truncations = new 
ArrayList<>();
 if (!offsetsResult.partitionsToRetry().isEmpty()) {
-
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(),

Review Comment:
   Same question as the above. Does this really do exponential backoff since 
attempts is 0 for every new request?
   
   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -225,14 +231,21 @@ private void resetPositionsAsync(Map partitionResetTimesta
 
 RequestFuture future = 
sendListOffsetRequest(node, resetTimestamps, false);
 future.addListener(new RequestFutureListener() {
+long attempts = 0L;
 @Override
 public void onSuccess(ListOffsetResult result) {
- 

[jira] [Assigned] (KAFKA-15271) Historicalterator can exposes elements that are too new

2023-07-28 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-15271:


Assignee: Colin McCabe

> Historicalterator can exposes elements that are too new
> ---
>
> Key: KAFKA-15271
> URL: https://issues.apache.org/jira/browse/KAFKA-15271
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: Colin McCabe
>Priority: Major
>
> Example:
> {code:java}
> @Test
> public void bug() {
> SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
> LogContext());
> // Topic -> Partition -> Offset
> TimelineHashMap> offsets =
> new TimelineHashMap<>(snapshotRegistry, 0);
> snapshotRegistry.getOrCreateSnapshot(0);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(0, 100L);
> snapshotRegistry.getOrCreateSnapshot(1);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(1, 110L);
> snapshotRegistry.getOrCreateSnapshot(2);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(1, 111L);
> assertNull(offsets.get("foo", 1).get(1, 1));
> offsets.entrySet(1).forEach(topicEntry -> {
> System.out.println(topicEntry.getKey());
> topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
> System.out.println(partitionEntry.getKey() + " : " + 
> partitionEntry.getValue());
> });
> });
> /*
> The above code prints:
> foo
> 0 : 100
> 1 : 110
> but should rather print:
> foo
> 0 : 100
>  */
> } {code}
> It yields the expected result when the third put is removed. `get(key, 
> epoch)` is always correct as well. It seems that `entrySet` has an issue.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-28 Thread via GitHub


philipnee commented on PR #14123:
URL: https://github.com/apache/kafka/pull/14123#issuecomment-1656346526

   Umm build failures for jdk 8 plus some non-related test failures:
   ```
   Build / JDK 20 and Scala 2.13 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   1m 31s
   Build / JDK 11 and Scala 2.13 / testGracefulClose() – 
org.apache.kafka.clients.consumer.KafkaConsumerTest
   2s
   Build / JDK 11 and Scala 2.13 / testCacheEntryExpiry() – 
kafka.log.remote.RemoteIndexCacheTest
   <1s
   Build / JDK 17 and Scala 2.13 / testGracefulClose() – 
org.apache.kafka.clients.consumer.KafkaConsumerTest
   2s
   Build / JDK 17 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   12s
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-28 Thread via GitHub


philipnee commented on PR #14123:
URL: https://github.com/apache/kafka/pull/14123#issuecomment-1656345210

   We don't.  Do you know when was is introduced? Is it in your inflight PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #14124: Kafka 14509

2023-07-28 Thread via GitHub


jeffkbkim commented on PR #14124:
URL: https://github.com/apache/kafka/pull/14124#issuecomment-1656328786

   Thanks, I will take a look next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #14124: Kafka 14509

2023-07-28 Thread via GitHub


dajac commented on PR #14124:
URL: https://github.com/apache/kafka/pull/14124#issuecomment-1656325056

   @riedelmax Thanks for the PR. I am on vacations during the next three weeks 
so I won’t be able to look at this one. @jeffkbkim Could you help @riedelmax?
   
   As a first step, @riedelmax could you please update the title and the 
description of the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #14126: MINOR Fix a Scala 2.12 checkstyle issue

2023-07-28 Thread via GitHub


mumrah opened a new pull request, #14126:
URL: https://github.com/apache/kafka/pull/14126

   This was introduced in #14115. I added this as part of a small tweak after 
the last Jenkins job. I ran `./gradlew check -x test` before committing, but 
apparently the issue was only present in Scala 2.12.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15271) Historicalterator can exposes elements that are too new

2023-07-28 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15271:
-
Summary: Historicalterator can exposes elements that are too new  (was: 
TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap)

> Historicalterator can exposes elements that are too new
> ---
>
> Key: KAFKA-15271
> URL: https://issues.apache.org/jira/browse/KAFKA-15271
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> Example:
> {code:java}
> @Test
> public void bug() {
> SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
> LogContext());
> // Topic -> Partition -> Offset
> TimelineHashMap> offsets =
> new TimelineHashMap<>(snapshotRegistry, 0);
> snapshotRegistry.getOrCreateSnapshot(0);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(0, 100L);
> snapshotRegistry.getOrCreateSnapshot(1);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(1, 110L);
> snapshotRegistry.getOrCreateSnapshot(2);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(1, 111L);
> assertNull(offsets.get("foo", 1).get(1, 1));
> offsets.entrySet(1).forEach(topicEntry -> {
> System.out.println(topicEntry.getKey());
> topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
> System.out.println(partitionEntry.getKey() + " : " + 
> partitionEntry.getValue());
> });
> });
> /*
> The above code prints:
> foo
> 0 : 100
> 1 : 110
> but should rather print:
> foo
> 0 : 100
>  */
> } {code}
> It yields the expected result when the third put is removed. `get(key, 
> epoch)` is always correct as well. It seems that `entrySet` has an issue.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-28 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748724#comment-17748724
 ] 

Max Riedel edited comment on KAFKA-14509 at 7/28/23 7:51 PM:
-

[~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2)
https://github.com/apache/kafka/pull/14124

Please pay special attention to ConsumerGroupDescribeResponse.json as this is 
now different from the definition in KIP-848


was (Author: JIRAUSER300902):
[~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2)
[https://github.com/apache/kafka/pull/14124

Please pay special attention to ConsumerGroupDescribeResponse.json as this is 
now different from the definition in 
KIP-848|https://github.com/apache/kafka/pull/14124]

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-28 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748724#comment-17748724
 ] 

Max Riedel edited comment on KAFKA-14509 at 7/28/23 7:50 PM:
-

[~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2)
[https://github.com/apache/kafka/pull/14124

Please pay special attention to ConsumerGroupDescribeResponse.json as this is 
now different from the definition in 
KIP-848|https://github.com/apache/kafka/pull/14124]


was (Author: JIRAUSER300902):
[~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2)
https://github.com/apache/kafka/pull/14124

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-28 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748724#comment-17748724
 ] 

Max Riedel commented on KAFKA-14509:


[~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2)
https://github.com/apache/kafka/pull/14124

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] riedelmax opened a new pull request, #14124: Kafka 14509

2023-07-28 Thread via GitHub


riedelmax opened a new pull request, #14124:
URL: https://github.com/apache/kafka/pull/14124

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14101: Source task stop call was added to force stopping execution.

2023-07-28 Thread via GitHub


gharris1727 commented on PR #14101:
URL: https://github.com/apache/kafka/pull/14101#issuecomment-1656189960

   Hey thanks @akitoshka for giving some attention to this issue.
   
   Unfortunately the fix as-is can't be merged, for the reasons that @yashmayya 
and @vamossagar12 raised above. While the particular plugin behavior (resource 
leaks) is out-of-scope for the framework to fix, I do think it would be 
appropriate for the framework to indicate to plugins that they have been 
cancelled, to allow themselves to clean up.
   
   There is a relevant ticket here: 
https://issues.apache.org/jira/browse/KAFKA-14725 which covers some of the 
suggested fixes, and I'm happy to review a PR for that.
   
   Thanks again for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15263) KRaftMigrationDriver can run the migration twice

2023-07-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15263.
--
Resolution: Fixed

> KRaftMigrationDriver can run the migration twice
> 
>
> Key: KAFKA-15263
> URL: https://issues.apache.org/jira/browse/KAFKA-15263
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> There is a narrow race condition in KRaftMigrationDriver where a PollEvent 
> can run that sees the internal state as ZK_MIGRATION and is immediately 
> followed by another poll event (due to external call to {{{}wakeup(){}}}) 
> that results in two MigrateMetadataEvent being enqueued. 
> Since MigrateMetadataEvent lacks a check on the internal state, this causes 
> the metadata migration to occur twice. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 merged pull request #14091: MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest

2023-07-28 Thread via GitHub


gharris1727 merged PR #14091:
URL: https://github.com/apache/kafka/pull/14091


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14091: MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest

2023-07-28 Thread via GitHub


gharris1727 commented on PR #14091:
URL: https://github.com/apache/kafka/pull/14091#issuecomment-1656157411

   Flaky test failures appear unrelated, and the JDK8 build succeeded with no 
failures!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15263) KRaftMigrationDriver can run the migration twice

2023-07-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15263:
-
Affects Version/s: 3.5.1
   3.5.0

> KRaftMigrationDriver can run the migration twice
> 
>
> Key: KAFKA-15263
> URL: https://issues.apache.org/jira/browse/KAFKA-15263
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> There is a narrow race condition in KRaftMigrationDriver where a PollEvent 
> can run that sees the internal state as ZK_MIGRATION and is immediately 
> followed by another poll event (due to external call to {{{}wakeup(){}}}) 
> that results in two MigrateMetadataEvent being enqueued. 
> Since MigrateMetadataEvent lacks a check on the internal state, this causes 
> the metadata migration to occur twice. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan merged pull request #14117: MINOR: Code cleanups in group-coordinator module

2023-07-28 Thread via GitHub


jolshan merged PR #14117:
URL: https://github.com/apache/kafka/pull/14117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15263) KRaftMigrationDriver can run the migration twice

2023-07-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15263:
-
Fix Version/s: 3.6.0
   3.5.2

> KRaftMigrationDriver can run the migration twice
> 
>
> Key: KAFKA-15263
> URL: https://issues.apache.org/jira/browse/KAFKA-15263
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> There is a narrow race condition in KRaftMigrationDriver where a PollEvent 
> can run that sees the internal state as ZK_MIGRATION and is immediately 
> followed by another poll event (due to external call to {{{}wakeup(){}}}) 
> that results in two MigrateMetadataEvent being enqueued. 
> Since MigrateMetadataEvent lacks a check on the internal state, this causes 
> the metadata migration to occur twice. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14702) Extend server side assignor to support rack aware replica placement

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14702.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> Extend server side assignor to support rack aware replica placement
> ---
>
> Key: KAFKA-14702
> URL: https://issues.apache.org/jira/browse/KAFKA-14702
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Ritika Muduganti
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-28 Thread via GitHub


dajac commented on PR #14099:
URL: https://github.com/apache/kafka/pull/14099#issuecomment-1656057225

   The build for JDK 17 is stuck... The three others look good. I have seen a 
few build stuck this week in other PRs so this is clearly not related to 
changes made in this one. Moreover, we had successful builds previously and we 
addressed nits in the mean time. Therefore, I will merge this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-28 Thread via GitHub


philipnee commented on code in PR #14123:
URL: https://github.com/apache/kafka/pull/14123#discussion_r1277840446


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -153,12 +153,14 @@ public void testCommitAsync_UserSuppliedCallback() {
 @SuppressWarnings("unchecked")
 public void testCommitted() {
 Set mockTopicPartitions = 
mockTopicPartitionOffset().keySet();
-mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
-when(mock.complete(any())).thenReturn(new HashMap<>());
-});
-consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
-assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, 
Duration.ofMillis(1)));
-
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
+try (MockedConstruction mockConstruction 
= mockConstruction(OffsetFetchApplicationEvent.class,

Review Comment:
   try-with-resources ensure thread-lock mockConstruction is closed to ensure 
other tests would function properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-28 Thread via GitHub


philipnee commented on code in PR #14123:
URL: https://github.com/apache/kafka/pull/14123#discussion_r1277835494


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java:
##
@@ -19,25 +19,33 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class OffsetFetchApplicationEvent extends ApplicationEvent {
-final public CompletableFuture> 
future;
-public final Set partitions;
+private final CompletableFuture> 
future;
+private final Set partitions;
 
 public OffsetFetchApplicationEvent(final Set partitions) {
 super(Type.FETCH_COMMITTED_OFFSET);
 this.partitions = partitions;
 this.future = new CompletableFuture<>();
 }
 
-public Map complete(final Duration 
duration) throws ExecutionException, InterruptedException, TimeoutException {

Review Comment:
   Removed because this method was rather pointless



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout

2023-07-28 Thread via GitHub


philipnee opened a new pull request, #14123:
URL: https://github.com/apache/kafka/pull/14123

   **Summary**
   I discovered the committed() API timeout during the integration test.  After 
investigation, this is because the future was not completed in the 
ApplicationEventProcessor. I also added `toString` methods to the event class 
for debug purposes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277811461


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Right. Let me do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-07-28 Thread via GitHub


dajac commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277803779


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   In my opinion, if you want to do something like that, it would be better to 
do it for all callbacks then. Adding just one comment there is not an ideal end 
state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277800358


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Yeah, we refactored these examples a while back, keeping the original logic, 
but improving error handling and output. We also added some comments, which may 
be useful to people new to Kafka. Adding this comment may be along these lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event

2023-07-28 Thread via GitHub


mumrah merged PR #14115:
URL: https://github.com/apache/kafka/pull/14115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event

2023-07-28 Thread via GitHub


mumrah commented on PR #14115:
URL: https://github.com/apache/kafka/pull/14115#issuecomment-1656019490

   Test failures look unrelated 
   
   https://github.com/apache/kafka/assets/55116/a8e1f6db-57ef-4450-b595-a54c164db524";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-07-28 Thread via GitHub


dajac commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277792665


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   I am sorry but I don't understand what you are trying to achieve here. Your 
explanation is correct and the javadoc of `onPartitionsLost` also explains 
this. I don't see the need for adding an extra comment just in this method in 
this example. Is your goal trying to improve the example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-07-28 Thread via GitHub


dajac commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277792665


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   I am sorry but I don't understand why you are trying to achieve here. Your 
explanation is correct and the javadoc of `onPartitionsLost` also explains 
this. I don't see the need for adding an extra comment just in this method in 
this example. Is your goal trying to improve the example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this reason, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   The default onPartitionsLost implementation is there to cover the case where 
partitions are reassigned before we have a chance to revoke them gracefully 
(i.e. in case of session timeout). Maybe we can leave the override adding an 
appropriate comment. Wdyt?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this reason, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   The default onPartitionsLost implementation is there to cover the case where 
partitions are reassigned before we have a chance to revoke them gracefully 
(i.e. in case of session timeout). Maybe we can leave the override adding an 
appropriate comment. Wdyt?
   
   Something like:
   
   ```sh
   // this is called when partitions are reassigned before we had a chance to 
revoke them gracefully
   // we can't commit pending offsets because these partitions are probably 
owned by other consumers already
   // nevertheless, we may need to do some other cleanup
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this reason, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   The default onPartitionsLost implementation is there to cover the case where 
partitions are reassigned before we have a chance to revoke them gracefully 
(i.e. in case of session timeout). Maybe we can leave the override adding an 
appropriate comment. Wdyt?
   
   Something like:
   
   ```sh
   // this is called when partitions are reassigned before we had a chance to 
revoke them gracefully,
   // we can't commit pending offsets because these partitions are probably 
owned by other consumers already
   // nevertheless, we may need to do some other cleanup
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this reason, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   The default onPartitionsLost implementation is there to cover the case where 
partitions are reassigned before we have a chance to revoke them gracefully 
(i.e. in case of session timeout). Maybe we can leave the override adding an 
appropriate comment. Wdyt?
   
   Something like:
   
   ```sh
   // this is called when partitions are reassigned before we had a chance to 
revoke them gracefully
   // we can't commit pending offsets because these partitions are probably 
owned by other consumers already
   // nevertheless, we may need to do some other cleanup
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response

2023-07-28 Thread jianbin.chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748684#comment-17748684
 ] 

jianbin.chen commented on KAFKA-14257:
--

[~cokutan] You'll want to use the same meta.properties, rather than generating 
one for each broker.

> Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
> -
>
> Key: KAFKA-14257
> URL: https://issues.apache.org/jira/browse/KAFKA-14257
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.3
>Reporter: jianbin.chen
>Priority: Major
>
> Please help me see why the error message is output indefinitely
> broker1:
> {code:java}
> process.roles=broker,controller
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=1
> listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093
> inter.broker.listener.name=PLAINTEXT
> advertised.listeners=PLAINTEXT://192.168.6.57:9092
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker2
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> node.id=2
> listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs{code}
> broker3
> {code:java}
> process.roles=broker,controller
> controller.listener.names=CONTROLLER
> num.io.threads=8
> num.network.threads=5
> node.id=3
> listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093
> inter.broker.listener.name=PLAINTEXT
> controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093
> log.dirs=/data01/kafka323-logs
> {code}
> error msg:
> {code:java}
> [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient)
> [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error 
> INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, 
> data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) 
> (org.apache.kafka.raft.KafkaRaftClient){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-28 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen updated KAFKA-15264:
-
Description: 
I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster 
deployment), and when I recently compared and tested, I found that when using 
the following stress test command, the throughput gap is obvious

 
{code:java}
./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
--record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: 
acks=1
419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 
588.0 ms max latency.
555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
latency, 460.0 ms max latency.
552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
latency, 1120.0 ms max latency.
552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
latency, 1097.0 ms max latency.
538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
latency, 610.0 ms max latency.
511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 
1892.0 ms max latency.
511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 
3000.0 ms max latency.
519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
latency, 1781.0 ms max latency.
513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
latency, 2590.0 ms max latency.
463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 
1463.0 ms max latency.
494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 
2362.0 ms max latency.
506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 
2986.0 ms max latency.
393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 
2958.0 ms max latency.
426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 
1959.0 ms max latency.
412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 
1995.0 ms max latency.
370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 
1496.0 ms max latency.
391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 
2446.0 ms max latency.
355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 
2715.0 ms max latency.
385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 
2702.0 ms max latency.
381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 
1846.0 ms max latency.
67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 
1414.0 ms max latency.
376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 
1897.0 ms max latency.
354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 
1601.0 ms max latency.
353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 
1563.0 ms max latency.
321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 
1975.0 ms max latency.
404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 
1753.0 ms max latency.
384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 
1833.0 ms max latency.
387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 
1927.0 ms max latency.
343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 
1685.0 ms max latency.
00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 
2146.0 ms max latency.
361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 
2125.0 ms max latency.
357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 
1502.0 ms max latency.
340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 
1932.0 ms max latency.
390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 
1807.0 ms max latency.
352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 
1892.0 ms max latency.
354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 
2128.0 ms max latency.
356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 
1329.0 ms max latency.
309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 
2566.0 ms max latency.
366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 
2169.0 ms max latency.
375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 
1722.0 ms max latency.
359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 
1961.0 ms max latency.
312276 records sent, 62430.2 records/sec (60.97 MB/sec), 477.4 ms avg latency, 
2006.0 ms max latency.
361875 records sent, 72360.5 records/sec (70.66 MB/sec), 

[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this reason, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   The default onPartitionsLost implementation is there to cover the case where 
partitions are reassigned before we have a chance to revoke them gracefully 
(i.e. in case of session timeout). Maybe we can leave the override adding an 
appropriate comment. Wdyt?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this reason, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   The default onPartitionsLost implementation is there to cover the case where 
partitions are reassigned before we have a chance to revoke them gracefully 
(i.e. in case of session timeout).Maybe we can leave it adding an appropriate 
comment. Wdyt?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this readon, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From the javadoc we have:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   I guess the default onPartitionsLost implementation is there to cover some 
corner case where onPartitionsRevoked may not be triggered, but we may still 
need to do some other cleanup. Is that correct? Maybe we can leave it adding an 
appropriate comment. Wdyt?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #14122: MINOR: GroupMetadataManagerTest.java style fix

2023-07-28 Thread via GitHub


jeffkbkim opened a new pull request, #14122:
URL: https://github.com/apache/kafka/pull/14122

   This patch makes the styling consistent inside GroupMetadataManagerTest.java.
   
   Also, it adds `JoinResult` to simplify the JoinGroup API responses in the 
tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   Hi @dajac, the default implementation of onPartitionsLost calls 
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit 
pending offsets before losing the partition ownership). This also means that 
"revoked" is logged instead of "lost".
   
   
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
   
   For this readon, I thought we could simply use the default implementation 
without overriding, but now I see that it is too late to save the offsets when 
onPartitionsLost is called, since these partitions are probably owned by other 
consumers already.
   
   From ConsumerRebalanceListener javadoc:
   
   ```sh
   public void onPartitionsLost(Collection partitions) {
   // do not need to save the offsets since these partitions are probably 
owned by other consumers already
   }
   ```
   
   I guess the default onPartitionsLost implementation is there to cover some 
corner case where onPartitionsRevoked may not be triggered, but we may still 
need to do some other cleanup. Is that correct? Maybe we can leave it adding an 
appropriate comment. Wdyt?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #14086: MINOR: Test assign() and assignment() in the integration test

2023-07-28 Thread via GitHub


junrao merged PR #14086:
URL: https://github.com/apache/kafka/pull/14086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14117: MINOR: Code cleanups in group-coordinator module

2023-07-28 Thread via GitHub


jolshan commented on code in PR #14117:
URL: https://github.com/apache/kafka/pull/14117#discussion_r1277712647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1014,7 +1013,7 @@ public void onHighWatermarkUpdated(
  * @param processor The event processor.
  * @param partitionWriter   The partition writer.
  * @param loaderThe coordinator loader.
- * @param coordinatorBuilderSupplierThe coordinator builder.
+ * @param coordinatorShardBuilderSupplierThe coordinator builder.

Review Comment:
   nit: this changed the spacing of the params. Do we want to line them up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


dajac commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277683057


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection 
partitions) {
 public void onPartitionsAssigned(Collection partitions) {
 Utils.printOut("Assigned partitions: %s", partitions);
 }
-
-@Override
-public void onPartitionsLost(Collection partitions) {
-Utils.printOut("Lost partitions: %s", partitions);

Review Comment:
   I don't really understand the motivation of this change. It seems to me that 
we are losing the log if we do so. Could you elaborate more on the motivation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15271) TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-15271:

Description: 
Example:
{code:java}
@Test
public void bug() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
// Topic -> Partition -> Offset
TimelineHashMap> offsets =
new TimelineHashMap<>(snapshotRegistry, 0);

snapshotRegistry.getOrCreateSnapshot(0);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(0, 100L);

snapshotRegistry.getOrCreateSnapshot(1);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 110L);

snapshotRegistry.getOrCreateSnapshot(2);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 111L);

assertNull(offsets.get("foo", 1).get(1, 1));

offsets.entrySet(1).forEach(topicEntry -> {
System.out.println(topicEntry.getKey());
topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
System.out.println(partitionEntry.getKey() + " : " + 
partitionEntry.getValue());
});
});

/*
The above code prints:
foo
0 : 100
1 : 110

but should rather print:
foo
0 : 100
 */
} {code}
It yields the expected result when the third put is removed. `get(key, epoch)` 
is always correct as well. It seems that `entrySet` has an issue.  

  was:
Example:
{code:java}
@Test
public void bug() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
// Topic -> Partition -> Offset
TimelineHashMap> offsets =
new TimelineHashMap<>(snapshotRegistry, 0);

snapshotRegistry.getOrCreateSnapshot(0);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(0, 100L);

snapshotRegistry.getOrCreateSnapshot(1);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 110L);

snapshotRegistry.getOrCreateSnapshot(2);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 111L);

assertNull(offsets.get("foo", 1).get(1, 1));

offsets.entrySet(1).forEach(topicEntry -> {
System.out.println(topicEntry.getKey());
topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
System.out.println(partitionEntry.getKey() + " : " + 
partitionEntry.getValue());
});
});

/*
The above code prints:
foo
0 : 100
1 : 110

but should rather print:
foo
0 : 100
 */
} {code}
It yields the expected result when the third put is removed. 


> TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap
> -
>
> Key: KAFKA-15271
> URL: https://issues.apache.org/jira/browse/KAFKA-15271
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> Example:
> {code:java}
> @Test
> public void bug() {
> SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
> LogContext());
> // Topic -> Partition -> Offset
> TimelineHashMap> offsets =
> new TimelineHashMap<>(snapshotRegistry, 0);
> snapshotRegistry.getOrCreateSnapshot(0);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(0, 100L);
> snapshotRegistry.getOrCreateSnapshot(1);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(1, 110L);
> snapshotRegistry.getOrCreateSnapshot(2);
> offsets
> .computeIfAbsent("foo", __ -> new 
> TimelineHashMap<>(snapshotRegistry, 0))
> .put(1, 111L);
> assertNull(offsets.get("foo", 1).get(1, 1));
> offsets.entrySet(1).forEach(topicEntry -> {
> System.out.println(topicEntry.getKey());
> topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
> System.out.println(partitionEntry.getKey() + " : " + 
> partitionEntry.getValue());
> });
> });
> /*
> The above code prints:
> foo
> 0 : 100
> 1 : 110
> but should rather print:

[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-28 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1655850239

   Zoning in on ControllerIntegrationTest#testControllerMoveOnTopicCreation:
   
   ```
   ...
 val event = appender.getMessages.find(e => e.getLevel == Level.INFO
   && e.getThrowableInformation != null
   && 
e.getThrowableInformation.getThrowable.getClass.getName.equals(classOf[ControllerMovedException].getName))
 assertTrue(event.isDefined) <- THIS IS THE ASSERTION WHICH FAILS
   ...
   ```
   
   However from the test logs:
   ```
   16:03:02.258 [controller-event-thread] INFO kafka.controller.KafkaController 
- [Controller id=0] Controller moved to another broker when processing 
TopicChange.
   org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15271) TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap

2023-07-28 Thread David Jacot (Jira)
David Jacot created KAFKA-15271:
---

 Summary: TimelineHashMap.entrySet yield unexpected results with 
nested TimelineHashMap
 Key: KAFKA-15271
 URL: https://issues.apache.org/jira/browse/KAFKA-15271
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot


Example:
{code:java}
@Test
public void bug() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
// Topic -> Partition -> Offset
TimelineHashMap> offsets =
new TimelineHashMap<>(snapshotRegistry, 0);

snapshotRegistry.getOrCreateSnapshot(0);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(0, 100L);

snapshotRegistry.getOrCreateSnapshot(1);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 110L);

snapshotRegistry.getOrCreateSnapshot(2);

offsets
.computeIfAbsent("foo", __ -> new 
TimelineHashMap<>(snapshotRegistry, 0))
.put(1, 111L);

assertNull(offsets.get("foo", 1).get(1, 1));

offsets.entrySet(1).forEach(topicEntry -> {
System.out.println(topicEntry.getKey());
topicEntry.getValue().entrySet(1).forEach(partitionEntry -> {
System.out.println(partitionEntry.getKey() + " : " + 
partitionEntry.getValue());
});
});

/*
The above code prints:
foo
0 : 100
1 : 110

but should rather print:
foo
0 : 100
 */
} {code}
It yields the expected result when the third put is removed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case

2023-07-28 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-15270:
--

Assignee: Lianet Magrans

> Integration tests for AsyncConsumer simple consume case
> ---
>
> Key: KAFKA-15270
> URL: https://issues.apache.org/jira/browse/KAFKA-15270
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> This task involves writing integration tests for covering the simple consume 
> functionality of the AsyncConsumer. This should include validation of the 
> assign, fetch and positions logic.
> Not covering any committed offset functionality as part of this task. 
> Integration tests should have a similar form as the existing 
> PlaintextConsumerTest, but scoped to the simple consume flow. 
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case

2023-07-28 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15270:
--

 Summary: Integration tests for AsyncConsumer simple consume case
 Key: KAFKA-15270
 URL: https://issues.apache.org/jira/browse/KAFKA-15270
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


This task involves writing integration tests for covering the simple consume 
functionality of the AsyncConsumer. This should include validation of the 
assign, fetch and positions logic.

Not covering any committed offset functionality as part of this task. 

Integration tests should have a similar form as the existing 
PlaintextConsumerTest, but scoped to the simple consume flow. 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-28 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1655821383

   At least on a first look it appears something changed/is wrong with the 
loggers:
   ```
   [2023-02-22T17:35:34.655Z] Gradle Test Run :core:unitTest > Gradle Test 
Executor 136 > ConfigAdminManagerTest > 
testPreprocessIncrementalWithLoggerChanges() FAILED
   [2023-02-22T17:35:34.655Z] org.opentest4j.AssertionFailedError: 
expected: <{AlterConfigsResource(resourceType=8, resourceName='1', 
configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', 
configOperation=0, value='INFO')])=ApiError(error=NONE, message=null)}> but 
was: <{AlterConfigsResource(resourceType=8, resourceName='1', 
configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', 
configOperation=0, value='INFO')])=ApiError(error=INVALID_CONFIG, 
message=Logger kafka.server.ConfigAdminManagerTest does not exist!)}>
   [2023-02-22T17:35:34.655Z] at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   [2023-02-22T17:35:34.655Z] at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   [2023-02-22T17:35:34.655Z] at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   [2023-02-22T17:35:34.655Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
   [2023-02-22T17:35:34.655Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
   [2023-02-22T17:35:34.655Z] at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142)
   [2023-02-22T17:35:34.656Z] at 
app//kafka.server.ConfigAdminManagerTest.testPreprocessIncrementalWithLoggerChanges(ConfigAdminManagerTest.scala:352)
   
   ...
   
   [2023-02-22T17:35:34.656Z] Gradle Test Run :core:unitTest > Gradle Test 
Executor 136 > ConfigAdminManagerTest > testValidateLogLevelConfigs() FAILED
   [2023-02-22T17:35:34.656Z] 
org.apache.kafka.common.errors.InvalidConfigurationException: Logger 
kafka.server.ConfigAdminManagerTest does not exist!
   
   ...
   
   [2023-02-22T17:36:10.045Z] Gradle Test Run :core:unitTest > Gradle Test 
Executor 136 > LoggingTest > testLoggerLevelIsResolved() FAILED
   [2023-02-22T17:36:10.045Z] org.opentest4j.AssertionFailedError: 
expected:  but was: 
   [2023-02-22T17:36:10.045Z] at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   [2023-02-22T17:36:10.045Z] at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   [2023-02-22T17:36:10.045Z] at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   [2023-02-22T17:36:10.045Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
   [2023-02-22T17:36:10.045Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
   [2023-02-22T17:36:10.045Z] at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142)
   [2023-02-22T17:36:10.045Z] at 
app//kafka.utils.LoggingTest.testLoggerLevelIsResolved(LoggingTest.scala:81)
   
   ...
   
   [2023-02-22T17:37:30.427Z] Gradle Test Run :core:unitTest > Gradle Test 
Executor 173 > ConfigAdminManagerTest > 
testPreprocessIncrementalWithLoggerChanges() FAILED
   [2023-02-22T17:37:30.427Z] org.opentest4j.AssertionFailedError: 
expected: <{AlterConfigsResource(resourceType=8, resourceName='1', 
configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', 
configOperation=0, value='INFO')])=ApiError(error=NONE, message=null)}> but 
was: <{AlterConfigsResource(resourceType=8, resourceName='1', 
configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', 
configOperation=0, value='INFO')])=ApiError(error=INVALID_CONFIG, 
message=Logger kafka.server.ConfigAdminManagerTest does not exist!)}>
   [2023-02-22T17:37:30.427Z] at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   [2023-02-22T17:37:30.427Z] at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   [2023-02-22T17:37:30.427Z] at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   [2023-02-22T17:37:30.427Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
   [2023-02-22T17:37:30.427Z] at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
   [2023-02-22T17:37:30.427Z] at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142)
   [2023-02-22T17:37:30.427Z] at 
app//kafka.server.ConfigAdminManagerTest.testPreprocessIncrementalWithLoggerChanges(ConfigAdminManagerTest.scala:352)
   
   ...
   
   [2023-02-22T17:37:30.427Z] Gradle Test Run :core:unitTest > Gradle Test 
Ex

[jira] [Created] (KAFKA-15269) Clean up the RaftClient interface

2023-07-28 Thread Jira
José Armando García Sancio created KAFKA-15269:
--

 Summary: Clean up the RaftClient interface
 Key: KAFKA-15269
 URL: https://issues.apache.org/jira/browse/KAFKA-15269
 Project: Kafka
  Issue Type: Task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


Make the following changes to the {{RaftClient}} interface and implementation

Remove {{scheduleAtomicAppend;}} the controller doesn't use {{scheduleAppend}} 
so we can revert to the original semantic.

{{logEndOffset}} is misleading when called on the leader since it doesn't 
include records already appended to the {{BatchAccumulator}} and have not been 
written to the log. Rename it to {{endOffset}} in the process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-28 Thread via GitHub


clolov commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1655753598

   At least the below tests have started to continuously fail. I am still 
looking as to why:
   ```
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
ControllerIntegrationTest > testControllerMoveOnPreferredReplicaElection() 
FAILED
   org.opentest4j.AssertionFailedError: expected:  but was: 
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
   at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
   at 
app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnPreferredReplicaElection(ControllerIntegrationTest.scala:1885)
   
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
ControllerIntegrationTest > testControllerMoveOnTopicCreation() FAILED
   org.opentest4j.AssertionFailedError: expected:  but was: 
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
   at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
   at 
app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnTopicCreation(ControllerIntegrationTest.scala:1885)
   
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
ControllerIntegrationTest > testControllerMoveOnPartitionReassignment() FAILED
   org.opentest4j.AssertionFailedError: expected:  but was: 
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
   at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
   at 
app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnPartitionReassignment(ControllerIntegrationTest.scala:1885)
   
   Gradle Test Run :core:test > Gradle Test Executor 3 > 
ControllerIntegrationTest > testControllerMoveOnTopicDeletion() FAILED
   org.opentest4j.AssertionFailedError: expected:  but was: 
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
   at 
app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
   at 
app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
   at 
app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnTopicDeletion(ControllerIntegrationTest.scala:1885)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15089) Consolidate all the group coordinator configs

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-15089:
---

Assignee: (was: David Jacot)

> Consolidate all the group coordinator configs
> -
>
> Key: KAFKA-15089
> URL: https://issues.apache.org/jira/browse/KAFKA-15089
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> The group coordinator configurations are defined in KafkaConfig at the 
> moment. As KafkaConfig is defined in the core module, we can't pass it to the 
> new java modules to pass the configurations along.
> A suggestion here is to centralize all the configurations of a module in the 
> module itself similarly to what we have do for RemoteLogManagerConfig and 
> RaftConfig. We also need a mechanism to add all the properties defined in the 
> module to the KafkaConfig's ConfigDef.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15225) Define constants for record types

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-15225:
---

Assignee: (was: David Jacot)

> Define constants for record types
> -
>
> Key: KAFKA-15225
> URL: https://issues.apache.org/jira/browse/KAFKA-15225
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> Define constants for all the record types. Ideally, this should be defined in 
> the record definitions and the constants should be auto-generated (e.g. like 
> ApiKeys).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15061) CoordinatorPartitionWriter should reuse buffer

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-15061:
---

Assignee: (was: David Jacot)

> CoordinatorPartitionWriter should reuse buffer
> --
>
> Key: KAFKA-15061
> URL: https://issues.apache.org/jira/browse/KAFKA-15061
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15225) Define constants for record types

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-15225:

Priority: Minor  (was: Major)

> Define constants for record types
> -
>
> Key: KAFKA-15225
> URL: https://issues.apache.org/jira/browse/KAFKA-15225
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Minor
>
> Define constants for all the record types. Ideally, this should be defined in 
> the record definitions and the constants should be auto-generated (e.g. like 
> ApiKeys).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15268) Consider replacing Subscription Metadata by a hash

2023-07-28 Thread David Jacot (Jira)
David Jacot created KAFKA-15268:
---

 Summary: Consider replacing Subscription Metadata by a hash
 Key: KAFKA-15268
 URL: https://issues.apache.org/jira/browse/KAFKA-15268
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot


With the addition of the racks, the subscription metadata record is getting 
large, too large in my opinion. We should consider replacing it with an hash. 
The subscription metadata is mainly used to detect changes in metadata. A hash 
would give a similar functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-28 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748614#comment-17748614
 ] 

David Jacot commented on KAFKA-14509:
-

[~riedelmax] I left a few comments in your comment.

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-28 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748613#comment-17748613
 ] 

David Jacot commented on KAFKA-14509:
-

[~riedelmax] There are no error codes in requests so I am not sure what you are 
referring to. Could you elaborate?

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14501) Implement Heartbeat API

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14501.
-
Fix Version/s: 3.6.0
 Reviewer: David Jacot
   Resolution: Fixed

> Implement Heartbeat API
> ---
>
> Key: KAFKA-14501
> URL: https://issues.apache.org/jira/browse/KAFKA-14501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.6.0
>
>
> Implement Heartbeat API in the new Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-28 Thread via GitHub


dajac merged PR #14056:
URL: https://github.com/apache/kafka/pull/14056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri opened a new pull request, #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation

2023-07-28 Thread via GitHub


fvaleri opened a new pull request, #14121:
URL: https://github.com/apache/kafka/pull/14121

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-28 Thread via GitHub


dajac commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1277524963


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -843,15 +886,19 @@ public JoinGroupResponseData 
setupGroupWithPendingMember(GenericGroup group) thr
 // Start the join for the second member
 CompletableFuture followerJoinFuture = new 
CompletableFuture<>();
 CoordinatorResult result = sendGenericGroupJoin(
-joinRequest.setMemberId(UNKNOWN_MEMBER_ID),
+joinRequest
+.setMemberId(UNKNOWN_MEMBER_ID),
 followerJoinFuture
 );
 
 assertTrue(result.records().isEmpty());
 assertFalse(followerJoinFuture.isDone());
 
 CompletableFuture leaderJoinFuture = new 
CompletableFuture<>();
-result = sendGenericGroupJoin(joinRequest.setMemberId(leaderId), 
leaderJoinFuture);
+result = sendGenericGroupJoin(
+joinRequest
+.setMemberId(leaderJoinResponse.memberId()),

Review Comment:
   nit: When there is only one setter, it can stay on the same line. There are 
many cases in this file.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5837,15 +6007,15 @@ public void testNewMemberTimeoutCompletion() throws 
Exception {
 .build();
 GenericGroup group = context.createGenericGroup("group-id");
 
-JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+CompletableFuture joinFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(new JoinGroupRequestBuilder()

Review Comment:
   nit: I would be better to format it as follow:
   
   ```
   CoordinatorResult result = context.sendGenericGroupJoin(
   new JoinGroupRequestBuilder()
   .withGroupId("group-id")
   .withMemberId(UNKNOWN_MEMBER_ID)
   .withDefaultProtocolTypeAndProtocols()
   .withSessionTimeoutMs(context.genericGroupNewMemberJoinTimeoutMs 
+ 5000)
   .build(),
   joinFuture
   );
   
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -843,15 +886,19 @@ public JoinGroupResponseData 
setupGroupWithPendingMember(GenericGroup group) thr
 // Start the join for the second member
 CompletableFuture followerJoinFuture = new 
CompletableFuture<>();
 CoordinatorResult result = sendGenericGroupJoin(
-joinRequest.setMemberId(UNKNOWN_MEMBER_ID),
+joinRequest
+.setMemberId(UNKNOWN_MEMBER_ID),
 followerJoinFuture
 );
 
 assertTrue(result.records().isEmpty());
 assertFalse(followerJoinFuture.isDone());
 
 CompletableFuture leaderJoinFuture = new 
CompletableFuture<>();
-result = sendGenericGroupJoin(joinRequest.setMemberId(leaderId), 
leaderJoinFuture);
+result = sendGenericGroupJoin(
+joinRequest
+.setMemberId(leaderJoinResponse.memberId()),
+leaderJoinFuture);

Review Comment:
   nit: I have noticed that we are not consistent about where we put the 
closing parenthesis.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5837,15 +6007,15 @@ public void testNewMemberTimeoutCompletion() throws 
Exception {
 .build();
 GenericGroup group = context.createGenericGroup("group-id");
 
-JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+CompletableFuture joinFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupJoin(new JoinGroupRequestBuilder()

Review Comment:
   This code will go away when you change to using `JoinResult`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-07-28 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-15267:
--
Labels: tiered-storage  (was: )

> Cluster-wide disablement of Tiered Storage
> --
>
> Key: KAFKA-15267
> URL: https://issues.apache.org/jira/browse/KAFKA-15267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> h2. Summary
> KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
> controls whether all resources needed for Tiered Storage to function are 
> instantiated properly in Kafka. However, the interaction between remote data 
> and Kafka if that configuration is set to false while there are still topics 
> with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would 
> like to give customers the ability to switch off Tiered Storage on a cluster 
> level and as such would need to define the behaviour.*{color}
> {{remote.log.storage.system.enable}} is a read-only configuration. This means 
> that it can only be changed by *modifying the server.properties* and 
> restarting brokers. As such, the {*}validity of values contained in it is 
> only checked at broker startup{*}.
> This JIRA proposes a few behaviours and a recommendation on a way forward.
> h2. Option 1: Change nothing
> Pros:
>  * No operation.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
> h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
> level and do not allow it to be disabled
> Always instantiate all resources for tiered storage. If no special ones are 
> selected use the default ones which come with Kafka.
> Pros:
>  * We solve the problem for moving between versions not allowing TS to be 
> disabled.
> Cons:
>  * We do not solve the problem of moving back to older (or newer) Kafka 
> versions not supporting TS.
>  * We haven’t quantified how much computer resources (CPU, memory) idle TS 
> components occupy.
>  * TS is a feature not required for running Kafka. As such, while it is still 
> under development we shouldn’t put it on the critical path of starting a 
> broker. In this way, a stray memory leak won’t impact anything on the 
> critical path of a broker.
>  * We are potentially swapping one problem for another. How does TS behave if 
> one decides to swap the TS plugin classes when data has already been written?
> h2. Option 3: Hide topics with tiering enabled
> Customers cannot interact with topics which have tiering enabled. They cannot 
> create new topics with the same names. Retention (and compaction?) do not 
> take effect on files already in local storage.
> Pros:
>  * We do not force data-deletion.
> Cons:
>  * This will be quite involved - the controller will need to know when a 
> broker’s server.properties have been altered; the broker will need to not 
> proceed to delete logs it is not the leader or follower for.
> h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
> tiering enabled{color} - Recommended
> This option has 2 different sub-options. The first one is that TS cannot be 
> disabled on cluster-level if there are *any* tiering topics - in other words 
> all tiered topics need to be deleted. The second one is that TS cannot be 
> disabled on a cluster-level if there are *any* topics with *tiering enabled* 
> - they can have tiering disabled, but with a retention policy set to delete 
> or retain (as per 
> [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
>  A topic can have tiering disabled and remain on the cluster as long as there 
> is no *remote* data when TS is disabled cluster-wide.
> Pros:
>  * We force the customer to be very explicit in disabling tiering of topics 
> prior to disabling TS on the whole cluster.
> Cons:
>  * You have to make certain that all data in remote is deleted (just a 
> disablement of tired topic is not enough). How do you determine whether all 
> remote has expired if policy is retain? If retain policy in KIP-950 knows 
> that there is data in remote then this should also be able to figure it out.
> The common denominator is that there needs to be no *remote* data at the 
> point of disabling TS. As such, the most straightforward option is to refuse 
> to start brokers if there are topics with the {{remote.storage.enabled}} 
> present. This in essence requires customers to clean any tiered topics before 
> switching off TS, which is a fair ask. Should we wish to revise this later it 
> should be possible.
> h2. Option 5: Make Kafka forget about all remote information
> Pros:
>  * Clean cut
> Cons:
>  * Data is lost the moment TS is disabled rega

[jira] [Updated] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-07-28 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-15267:
--
Description: 
h2. Summary

KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
controls whether all resources needed for Tiered Storage to function are 
instantiated properly in Kafka. However, the interaction between remote data 
and Kafka if that configuration is set to false while there are still topics 
with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like 
to give customers the ability to switch off Tiered Storage on a cluster level 
and as such would need to define the behaviour.*{color}

{{remote.log.storage.system.enable}} is a read-only configuration. This means 
that it can only be changed by *modifying the server.properties* and restarting 
brokers. As such, the {*}validity of values contained in it is only checked at 
broker startup{*}.

This JIRA proposes a few behaviours and a recommendation on a way forward.
h2. Option 1: Change nothing

Pros:
 * No operation.

Cons:
 * We do not solve the problem of moving back to older (or newer) Kafka 
versions not supporting TS.

h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
level and do not allow it to be disabled

Always instantiate all resources for tiered storage. If no special ones are 
selected use the default ones which come with Kafka.

Pros:
 * We solve the problem for moving between versions not allowing TS to be 
disabled.

Cons:
 * We do not solve the problem of moving back to older (or newer) Kafka 
versions not supporting TS.
 * We haven’t quantified how much computer resources (CPU, memory) idle TS 
components occupy.
 * TS is a feature not required for running Kafka. As such, while it is still 
under development we shouldn’t put it on the critical path of starting a 
broker. In this way, a stray memory leak won’t impact anything on the critical 
path of a broker.
 * We are potentially swapping one problem for another. How does TS behave if 
one decides to swap the TS plugin classes when data has already been written?

h2. Option 3: Hide topics with tiering enabled

Customers cannot interact with topics which have tiering enabled. They cannot 
create new topics with the same names. Retention (and compaction?) do not take 
effect on files already in local storage.

Pros:
 * We do not force data-deletion.

Cons:
 * This will be quite involved - the controller will need to know when a 
broker’s server.properties have been altered; the broker will need to not 
proceed to delete logs it is not the leader or follower for.

h2. {color:#00875a}Option 4: Do not start the broker if there are topics with 
tiering enabled{color} - Recommended

This option has 2 different sub-options. The first one is that TS cannot be 
disabled on cluster-level if there are *any* tiering topics - in other words 
all tiered topics need to be deleted. The second one is that TS cannot be 
disabled on a cluster-level if there are *any* topics with *tiering enabled* - 
they can have tiering disabled, but with a retention policy set to delete or 
retain (as per 
[KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
 A topic can have tiering disabled and remain on the cluster as long as there 
is no *remote* data when TS is disabled cluster-wide.

Pros:
 * We force the customer to be very explicit in disabling tiering of topics 
prior to disabling TS on the whole cluster.

Cons:
 * You have to make certain that all data in remote is deleted (just a 
disablement of tired topic is not enough). How do you determine whether all 
remote has expired if policy is retain? If retain policy in KIP-950 knows that 
there is data in remote then this should also be able to figure it out.

The common denominator is that there needs to be no *remote* data at the point 
of disabling TS. As such, the most straightforward option is to refuse to start 
brokers if there are topics with the {{remote.storage.enabled}} present. This 
in essence requires customers to clean any tiered topics before switching off 
TS, which is a fair ask. Should we wish to revise this later it should be 
possible.
h2. Option 5: Make Kafka forget about all remote information

Pros:
 * Clean cut

Cons:
 * Data is lost the moment TS is disabled regardless of whether it is reenabled 
later on, which might not be the behaviour expected by customers.

  was:
h2. Summary

KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
controls whether all resources needed for Tiered Storage to function are 
instantiated properly in Kafka. However, the interaction between remote data 
and Kafka if that configuration is set to false while there are still topics 
with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like 
to give customers the ability to switch off Tie

[jira] [Created] (KAFKA-15267) Cluster-wide disablement of Tiered Storage

2023-07-28 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-15267:
-

 Summary: Cluster-wide disablement of Tiered Storage
 Key: KAFKA-15267
 URL: https://issues.apache.org/jira/browse/KAFKA-15267
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov
Assignee: Christo Lolov


h2. Summary

KIP-405 defines the configuration {{remote.log.storage.system.enable}} which 
controls whether all resources needed for Tiered Storage to function are 
instantiated properly in Kafka. However, the interaction between remote data 
and Kafka if that configuration is set to false while there are still topics 
with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like 
to give customers the ability to switch off Tiered Storage on a cluster level 
and as such would need to define the behaviour.*{color}

{{remote.log.storage.system.enable}} is a read-only configuration. This means 
that it can only be changed by *modifying the server.properties* and restarting 
brokers. As such, the {*}validity of values contained in it is only checked at 
broker startup{*}.

This JIRA proposes a few behaviours and a recommendation on a way forward.
h2. Option 1: Change nothing

Pros:
 * No operation.

Cons:
 * We do not solve the problem of moving back to older (or newer) Kafka 
versions not supporting TS.

h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster 
level and do not allow it to be disabled

Always instantiate all resources for tiered storage. If no special ones are 
selected use the default ones which come with Kafka.

Pros:
 * We solve the problem for moving between versions not allowing TS to be 
disabled.

Cons:
 * We do not solve the problem of moving back to older (or newer) Kafka 
versions not supporting TS.
 * We haven’t quantified how much computer resources (CPU, memory) idle TS 
components occupy.
 * TS is a feature not required for running Kafka. As such, while it is still 
under development we shouldn’t put it on the critical path of starting a 
broker. In this way, a stray memory leak won’t impact anything on the critical 
path of a broker.
 * We are potentially swapping one problem for another. How does TS behave if 
one decides to swap the TS plugin classes when data has already been written?

h2. Option 3: Hide topics with tiering enabled

Customers cannot interact with topics which have tiering enabled. They cannot 
create new topics with the same names. Retention (and compaction?) do not take 
effect on files already in local storage.

Pros:
 * We do not force data-deletion.

Cons:
 * This will be quite involved - the controller will need to know when a 
broker’s server.properties have been altered; the broker will need to not 
proceed to delete logs it is not the leader or follower for.

h2. {color:#e6e6e6}Option 4: Do not start the broker if there are topics with 
tiering enabled{color} - Recommended

This option has 2 different sub-options. The first one is that TS cannot be 
disabled on cluster-level if there are *any* tiering topics - in other words 
all tiered topics need to be deleted. The second one is that TS cannot be 
disabled on a cluster-level if there are *any* topics with *tiering enabled* - 
they can have tiering disabled, but with a retention policy set to delete or 
retain (as per 
[KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]).
 A topic can have tiering disabled and remain on the cluster as long as there 
is no *remote* data when TS is disabled cluster-wide.

Pros:
 * We force the customer to be very explicit in disabling tiering of topics 
prior to disabling TS on the whole cluster.

Cons:
 * You have to make certain that all data in remote is deleted (just a 
disablement of tired topic is not enough). How do you determine whether all 
remote has expired if policy is retain? If retain policy in KIP-950 knows that 
there is data in remote then this should also be able to figure it out.

The common denominator is that there needs to be no *remote* data at the point 
of disabling TS. As such, the most straightforward option is to refuse to start 
brokers if there are topics with the {{remote.storage.enabled}} present. This 
in essence requires customers to clean any tiered topics before switching off 
TS, which is a fair ask. Should we wish to revise this later it should be 
possible.
h2. Option 5: Make Kafka forget about all remote information

Pros:
 * Clean cut

Cons:
 * Data is lost the moment TS is disabled regardless of whether it is reenabled 
later on, which might not be the behaviour expected by customers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #14117: MINOR: Code cleanups in group-coordinator module

2023-07-28 Thread via GitHub


dajac commented on code in PR #14117:
URL: https://github.com/apache/kafka/pull/14117#discussion_r1277523528


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java:
##
@@ -17,14 +17,14 @@
 package org.apache.kafka.coordinator.group.runtime;
 
 /**
- * Supplies a {@link CoordinatorBuilder} to the {@link CoordinatorRuntime}.
+ * Supplies a {@link CoordinatorShardBuilder} to the {@link 
CoordinatorRuntime}.
  *
  * @param  The type of the coordinator.
  * @param  The record type.
  */
-public interface CoordinatorBuilderSupplier, U> {
+public interface CoordinatorBuilderSupplier, U> {

Review Comment:
   yeah, that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14117: MINOR: Code cleanups in group-coordinator module

2023-07-28 Thread via GitHub


jeffkbkim commented on code in PR #14117:
URL: https://github.com/apache/kafka/pull/14117#discussion_r1277519026


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java:
##
@@ -17,14 +17,14 @@
 package org.apache.kafka.coordinator.group.runtime;
 
 /**
- * Supplies a {@link CoordinatorBuilder} to the {@link CoordinatorRuntime}.
+ * Supplies a {@link CoordinatorShardBuilder} to the {@link 
CoordinatorRuntime}.
  *
  * @param  The type of the coordinator.
  * @param  The record type.
  */
-public interface CoordinatorBuilderSupplier, U> {
+public interface CoordinatorBuilderSupplier, U> {

Review Comment:
   should this be CoordinatorShardBuilderSupplier?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15246) CoordinatorContext should be protected by a lock

2023-07-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15246.
-
Fix Version/s: 3.6.0
 Reviewer: Justine Olshan
   Resolution: Fixed

> CoordinatorContext should be protected by a lock
> 
>
> Key: KAFKA-15246
> URL: https://issues.apache.org/jira/browse/KAFKA-15246
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac opened a new pull request, #14120: KAFKA-14499: [4/N] Implement OffsetFetch API

2023-07-28 Thread via GitHub


dajac opened a new pull request, #14120:
URL: https://github.com/apache/kafka/pull/14120

   This patch implements the OffsetFetch API in the new group coordinator.
   
   I found out that implementing the `RequireStable` flag is hard (to not say 
impossible) in the current model. For the context, the flag is here to ensure 
that an OffsetRequest request does not return stale offsets if there are 
pending offsets to be committed. In the scala code, we basically check the 
pending offsets data structure and if they are any pending offsets, we return 
the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.
   
   In our new model, we don't have the pending offsets data structure. Instead, 
we use a timeline data structure to handle all the pending/uncommitted changes. 
Because of this we don't know whether offsets are pending for a particular 
group. Instead of doing this, I propose to not return the 
`UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, 
we use a write operation to ensure that we read the latest offsets. If they are 
uncommitted offsets, the write operation ensures that the response is only 
return when they are committed. This gives a similar behaviour in the end.
   
   Note that this PR does not adds the MemberId and MemberEpoch fields to the 
request. This will be done in the next PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] amangandhi94 opened a new pull request, #14119: KAFKA-15266: Static configs set for non primary synonyms are ignored for Log configs

2023-07-28 Thread via GitHub


amangandhi94 opened a new pull request, #14119:
URL: https://github.com/apache/kafka/pull/14119

   This change ensures that values stored in "non-primary" synonyms is 
respected when altering configs. Currently if "non-primary" synonyms have a 
static config set up then it gets deleted from the default log config during 
reconfiguration. We want to ensure that we retain the correct order of using 
broker level synonyms(log.retention.ms > log.retention.minutes > 
log.retention.hours)
   
   I have added unit test cases for this scenario.
   1. During setup we configure log.roll.hours=24
   2. We then run a few alter configs and ensure that LogConfig for segment.ms 
is still set to 24 hours
   3. We then run alter-config to set log.roll.ms=5 and we verify that 
segment.ms also changes to 5
   4. We then delete the dynamic config for log.roll.ms and then verify that we 
again pick up the statically configured 24 hour value
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-07-28 Thread Tomonari Yamashita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748555#comment-17748555
 ] 

Tomonari Yamashita commented on KAFKA-15259:


Hi [~mjsax],

I think the stacktrace can be ignored. This problem cause is that sending a too 
large message to "output-topic", then RecordTooLargeException occurs in 
producer.send(), and the transaction would be rollbacked/aborted (3) despite 
ProductionExceptionHandlerResponse.CONTINUE if using execute_once. The 
stacktrace is just secondary disaster with an error caused by a subsequent 
commit on the transaction that has already been rollbacked/aborted.

As far as we have tested, even using the same source code (attached file: 
Reproducer.java), changing the version of Kafka Streams will lead to a 
different behavior whether "continue" is available or not (i.e., the 
transaction aborts), as follows:
- Kafka 2.6.3 : OK. "continue" worked as I assume.
- Kafka 2.8.2 : OK. "continue" worked as I assume.
- Kafka 3.0.0 : OK. "continue" worked as I assume.
- Kafka 3.1.2 : OK. "continue" worked as I assume.
- Kafka 3.2.0 : NG. "continue" didn't work because the transaction was 
rollbacked/aborted.
- Kafka 3.2.3 : NG. "continue" didn't work because the transaction was 
rollbacked/aborted.
- Kafka 3.5.1 : NG. "continue" didn't work because the transaction was 
rollbacked/aborted.

>From what I have seen in the debugger, it is probably transaction aborting at 
>those codes(3).
Probably due to this change (4)(5) in Kafka 3.2.0, if any of the individual 
producer.send() fails, the transaction is aborted forcibly.

Since this change (4) (5) is in Kafka producer, I realize that it would be 
difficult to achieve the behavior prior to 3.1.2, which allows "continue" to be 
used. However, if "continue" is not available, there is a concern that Kafka 
Streams applications will continue to get stuck because there is no way to 
ignore messages that raise RecordTooLargeException by sending them to the dead 
letter queue.

(3) 
https://github.com/apache/kafka/blob/3.5.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1081-L1083
{code:java}
if (transactionManager != null) {
transactionManager.maybeTransitionToErrorState(e);
}
{code}

(4) Silent data loss in Kafka producer
https://issues.apache.org/jira/browse/KAFKA-9279

(5) KAFKA-9279: Fail producer transactions for asynchronously-reported, 
synchronously-encountered ApiExceptions #11508
https://github.com/apache/kafka/pull/11508

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once

[jira] [Updated] (KAFKA-15266) Log configs ignore static configs set for non primary synonyms

2023-07-28 Thread Aman Harish Gandhi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aman Harish Gandhi updated KAFKA-15266:
---
Summary: Log configs ignore static configs set for non primary synonyms  
(was: Log configs ignore static configs set non primary synonyms)

> Log configs ignore static configs set for non primary synonyms
> --
>
> Key: KAFKA-15266
> URL: https://issues.apache.org/jira/browse/KAFKA-15266
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Aman Harish Gandhi
>Assignee: Aman Harish Gandhi
>Priority: Major
>
> In our server.properties we had the following config
> {code:java}
> log.retention.hours=48
> {code}
> We noticed that after running alter configs to update broker level config(for 
> a config unrelated to retention) we were only deleting data after 7 days 
> instead of the configured 2.
> The alterconfig we had ran was similar to this
> {code:java}
> sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config 
> "log.segment.bytes=50"
> {code}
> Digging deeper the issue could be pin pointed to the reconfigure block of 
> DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the 
> "primary" KafkaConfig synonym of the LogConfig and if it is not set then we 
> remove the value set in default log config as well. This eventually leads to 
> the retention.ms not being set in the default log config and that leads to 
> the default value of 7 days being used. The value set in 
> "log.retention.hours" is completely ignored in this case.
> Pasting the relevant code block here
> {code:java}
> newConfig.valuesFromThisConfig.forEach { (k, v) =>
>   if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
> DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
>   if (v == null)
>  newBrokerDefaults.remove(configName)
>   else
> newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
> }
>   }
> } {code}
> In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only 
> log.retention.ms. It does not contain the other synonyms like 
> `log.retention.minutes` or `log.retention.hours`.
> This issue seems prevalent in all cases where there are more than 1 
> KafkaConfig synonyms for the LogConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15266) Log configs ignore static configs set non primary synonyms

2023-07-28 Thread Aman Harish Gandhi (Jira)
Aman Harish Gandhi created KAFKA-15266:
--

 Summary: Log configs ignore static configs set non primary synonyms
 Key: KAFKA-15266
 URL: https://issues.apache.org/jira/browse/KAFKA-15266
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.6.0
Reporter: Aman Harish Gandhi
Assignee: Aman Harish Gandhi


In our server.properties we had the following config
{code:java}
log.retention.hours=48
{code}
We noticed that after running alter configs to update broker level config(for a 
config unrelated to retention) we were only deleting data after 7 days instead 
of the configured 2.

The alterconfig we had ran was similar to this
{code:java}
sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config 
"log.segment.bytes=50"
{code}
Digging deeper the issue could be pin pointed to the reconfigure block of 
DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the "primary" 
KafkaConfig synonym of the LogConfig and if it is not set then we remove the 
value set in default log config as well. This eventually leads to the 
retention.ms not being set in the default log config and that leads to the 
default value of 7 days being used. The value set in "log.retention.hours" is 
completely ignored in this case.

Pasting the relevant code block here
{code:java}
newConfig.valuesFromThisConfig.forEach { (k, v) =>
  if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
  if (v == null)
 newBrokerDefaults.remove(configName)
  else
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
}
  }
} {code}
In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only 
log.retention.ms. It does not contain the other synonyms like 
`log.retention.minutes` or `log.retention.hours`.

This issue seems prevalent in all cases where there are more than 1 KafkaConfig 
synonyms for the LogConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult

2023-07-28 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-14967:
--

Fix Version/s: 3.6.0
 Assignee: James Shaw
   Resolution: Fixed

> MockAdminClient throws NullPointerException in CreateTopicsResult
> -
>
> Key: KAFKA-14967
> URL: https://issues.apache.org/jira/browse/KAFKA-14967
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: James Shaw
>Assignee: James Shaw
>Priority: Major
> Fix For: 3.6.0
>
>
> Calling {{CreateTopicsResult.topicId().get()}} throws 
> {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns 
> the topicId.
> The NPE appears to be caused by [{{MockAdminClient.createTopics()}} calling 
> {{future.complete(null)}}|https://github.com/apache/kafka/blame/trunk/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java#L394]
> Stacktrace:
> {code:java}
> java.util.concurrent.ExecutionException: java.lang.NullPointerException
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>   at 
> MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37)
>[snip]
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
>   at 
> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58)
>   at 
> org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82)
>   ... 85 more
> {code}
> Test case to reproduce:
> {code:java}
>  
> import org.apache.kafka.clients.admin.Admin;
> import org.apache.kafka.clients.admin.CreateTopicsResult;
> import org.apache.kafka.clients.admin.MockAdminClient;
> import org.apache.kafka.clients.admin.NewTopic;
> import org.apache.kafka.common.Node;
> import org.apache.kafka.common.Uuid;
> import org.junit.jupiter.api.Test;
> import java.util.Optional;
> import java.util.concurrent.ExecutionException;
> import static java.util.Collections.singletonList;
> public class MockAdminClientBug {
> @Test
> void shouldNotThrowNullPointerException() throws ExecutionException, 
> InterruptedException {
> Node controller = new Node(0, "mock", 0);
> try (Admin admin = new MockAdminClient(singletonList(controller), 
> controller)) {
> CreateTopicsResult result = admin.createTopics(singletonList(new 
> NewTopic("TestTopic", Optional.empty(), Optional.empty(;
> Uuid topicId = result.topicId("TestTopic").get();
> System.out.println(topicId);
> }
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13671: KAFKA-14967: fix NPE in MockAdminClient CreateTopicsResult

2023-07-28 Thread via GitHub


mimaison merged PR #13671:
URL: https://github.com/apache/kafka/pull/13671


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #14101: Source task stop call was added to force stopping execution.

2023-07-28 Thread via GitHub


vamossagar12 commented on PR #14101:
URL: https://github.com/apache/kafka/pull/14101#issuecomment-1655210284

   > This is exactly our case: we have long source polling (in our case it's 
jdbc source task), each restarting doesn't break previous task (we get Graceful 
Shutdown failed), but creates new one => a lot of connections on db side.
   
   Yeah that's what I had imagined would be happening in your case. As I said 
you will need to play around with the graceful shutdown timeout and probably 
with the poll interval on the JDBC connector side (that would be out of scope 
here). IMO it might be hard to factor in the change in this PR considering that 
meddles with the current logic and we will need to make more changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #14117: MINOR: Code cleanups in group-coordinator module

2023-07-28 Thread via GitHub


dajac commented on PR #14117:
URL: https://github.com/apache/kafka/pull/14117#issuecomment-1655195050

   > can you point me to where the log context includes the topic partition 
info?
   
   Sure. It is right 
[here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java#L412).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-28 Thread via GitHub


flashmouse commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1655170419

   thx @ableegoldman 
   I reduce partitions num and increase consumers num, I think all tests 
influenced by this pr should be PASSED now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] muralibasani commented on pull request #14106: KAFKA-14585: 1st part : Java versions for metadata/broker and updated LogConfig

2023-07-28 Thread via GitHub


muralibasani commented on PR #14106:
URL: https://github.com/apache/kafka/pull/14106#issuecomment-1655168025

   @fvaleri would like to take a look at this ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org