[GitHub] [kafka] atu-sharm commented on pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-05-11 Thread via GitHub


atu-sharm commented on PR #13633:
URL: https://github.com/apache/kafka/pull/13633#issuecomment-1545201789

   Thanks, @vvcephei @mjsax @machi1990 for your valuable suggestions, i think 
it's better to apply this property to the whole project, made changes according 
to that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming merged pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-11 Thread via GitHub


dengziming merged PR #13679:
URL: https://github.com/apache/kafka/pull/13679


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-11 Thread via GitHub


dengziming commented on PR #13679:
URL: https://github.com/apache/kafka/pull/13679#issuecomment-1545197689

   The failed tests are flaky and also failed recently in other PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191927989


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
 this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
 this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
 this.interceptors = interceptors;
+this.inFlightAsyncCommits = new AtomicInteger();
 this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Okay. I understand this a bit better now. `pendingAsyncCommits` is really 
only meant to track inflight coordinator lookups. Then, in `close`, we wait on 
those to complete before calling `close` of the super class which waits on all 
the inflight requests to complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2023-05-11 Thread Mahesh Madushan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mahesh Madushan reassigned KAFKA-13349:
---

Assignee: Mahesh Madushan

> Allow Iterator.remove on KeyValueIterator
> -
>
> Key: KAFKA-13349
> URL: https://issues.apache.org/jira/browse/KAFKA-13349
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Mahesh Madushan
>Priority: Major
>  Labels: needs-kip, newbie++
>
> Today Stream's state store's range iterator does not support `remove`. We 
> could consider adding such support for all the built-in state stores:
> * RocksDB's native iterator does not support removal, but we can always do a 
> delete(key) concurrently while the iterator is open on the snapshot.
> * In-Memory: straight forward implementation.
> The benefit of that is then for range-and-delete truncation operation we do 
> not necessarily have to be cautious about concurrent modification exceptions. 
> This could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2023-05-11 Thread Mahesh Madushan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722007#comment-17722007
 ] 

Mahesh Madushan commented on KAFKA-13349:
-

[~mjsax] Thank you for the response. Will work on this.

> Allow Iterator.remove on KeyValueIterator
> -
>
> Key: KAFKA-13349
> URL: https://issues.apache.org/jira/browse/KAFKA-13349
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie++
>
> Today Stream's state store's range iterator does not support `remove`. We 
> could consider adding such support for all the built-in state stores:
> * RocksDB's native iterator does not support removal, but we can always do a 
> delete(key) concurrently while the iterator is open on the snapshot.
> * In-Memory: straight forward implementation.
> The benefit of that is then for range-and-delete truncation operation we do 
> not necessarily have to be cautious about concurrent modification exceptions. 
> This could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2023-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722005#comment-17722005
 ] 

Matthias J. Sax commented on KAFKA-13349:
-

Yes, we want to add `remove()` to interface 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java]
 and thus all implementation will need to support it.

> Allow Iterator.remove on KeyValueIterator
> -
>
> Key: KAFKA-13349
> URL: https://issues.apache.org/jira/browse/KAFKA-13349
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie++
>
> Today Stream's state store's range iterator does not support `remove`. We 
> could consider adding such support for all the built-in state stores:
> * RocksDB's native iterator does not support removal, but we can always do a 
> delete(key) concurrently while the iterator is open on the snapshot.
> * In-Memory: straight forward implementation.
> The benefit of that is then for range-and-delete truncation operation we do 
> not necessarily have to be cautious about concurrent modification exceptions. 
> This could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] aaron-ai opened a new pull request, #13710: MINOR: Fix the outdated comments of ConfigDef

2023-05-11 Thread via GitHub


aaron-ai opened a new pull request, #13710:
URL: https://github.com/apache/kafka/pull/13710

   Fix the outdated comments of `ConfigDef` since the signature of the 
corresponding method has been updated.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904

2023-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722004#comment-17722004
 ] 

Matthias J. Sax commented on KAFKA-14911:
-

No worries. And thanks for helping on reviewing! Equally important.

> Add system tests for rolling upgrade path of KIP-904
> 
>
> Key: KAFKA-14911
> URL: https://issues.apache.org/jira/browse/KAFKA-14911
> Project: Kafka
>  Issue Type: Test
>Reporter: Farooq Qaiser
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0
>
>
> As per [~mjsax] comment 
> [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752],
>  we should add a system test to test the rolling upgrade path for 
> [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new 
> serialization format for groupBy internal repartition topics and was 
> implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 
> There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf 
> `test_rolling_upgrade_with_2_bounces`) as a starting point.
> Might be best to do a similar thing as for FK-joins, and add a new test 
> variation. 
> The tricky thing about the test would be, to ensure that the repartition 
> topic is not empty when we do the bounce, so the test should be setup 
> accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722003#comment-17722003
 ] 

Matthias J. Sax commented on KAFKA-14981:
-

I was not aware that there was (or maybe still are) issue. Are there any 
tickets for it?

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

2023-05-11 Thread via GitHub


showuon merged PR #13517:
URL: https://github.com/apache/kafka/pull/13517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

2023-05-11 Thread via GitHub


showuon commented on PR #13517:
URL: https://github.com/apache/kafka/pull/13517#issuecomment-1545024000

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testTimeouts()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2023-05-11 Thread tony mancill (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721959#comment-17721959
 ] 

tony mancill commented on KAFKA-8154:
-

Thank you for that observation [~keith.paulson].   We have been seeing this 
issue with BouncyCastle in our environment. We use client versions 2.8.2 and 
3.4.0, but so far I've only tested 2.8.2.  The following patch addresses the 
issue for us with 2.8.2, clearing up both exceptions and the accompanying 
connection resets.
{code:java}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index b9879ad6da..effd5fb80d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -344,9 +344,9 @@ public class SslTransportLayer implements TransportLayer {
                     netWriteBuffer.compact();
                     netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, 
currentNetWriteBufferSize);
                     netWriteBuffer.flip();
-                    if (netWriteBuffer.limit() >= currentNetWriteBufferSize) {
+                    if (netWriteBuffer.limit() > currentNetWriteBufferSize) {
                         throw new IllegalStateException("Buffer overflow when 
available data size (" + netWriteBuffer.limit() +
-                                                        ") >= network buffer 
size (" + currentNetWriteBufferSize + ")");
+                                                        ") > network buffer 
size (" + currentNetWriteBufferSize + ")");
                     }
                 } else if (handshakeResult.getStatus() == 
Status.BUFFER_UNDERFLOW) {
                     throw new IllegalStateException("Should not have received 
BUFFER_UNDERFLOW during handshake WRAP.");
@@ -564,6 +564,7 @@ public class SslTransportLayer implements TransportLayer {
                 netReadBuffer.flip();
                 SSLEngineResult unwrapResult;
                 try {
+                    appReadBuffer = Utils.ensureCapacity(appReadBuffer, 
netReadBufferSize());
                     unwrapResult = sslEngine.unwrap(netReadBuffer, 
appReadBuffer);
                     if (state == State.POST_HANDSHAKE && 
appReadBuffer.position() != 0) {
                         // For TLSv1.3, we have finished processing 
post-handshake messages since we are now processing data
@@ -598,7 +599,7 @@ public class SslTransportLayer implements TransportLayer {
                                                         ") >= application 
buffer size (" + currentApplicationBufferSize + ")");
                     }
-                    // appReadBuffer will extended upto 
currentApplicationBufferSize
+                    // appReadBuffer will extend up to 
currentApplicationBufferSize
                     // we need to read the existing content into dst before we 
can do unwrap again. If there are no space in dst
                     // we can break here.
                     if (dst.hasRemaining()) {code}
 

A couple of notes, which is why I haven't submitted a PR yet - that is, I don't 
think the patch is complete yet:
 * First, I'm not sure whether the change in the hunk starting at line 564 is 
needed.  It was in my working tree while I tried to address this previously, so 
I need to test again without it.
 * There are other places in SslTransportLayer.java where 
Utils.ensureCapacity() is called and the following comparison uses >= instead 
of strictly >.  It would be nice to address them all at once.

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mehbey opened a new pull request, #13709: Added log timestamp validation to prevent records with timestamps tha…

2023-05-11 Thread via GitHub


mehbey opened a new pull request, #13709:
URL: https://github.com/apache/kafka/pull/13709

   ### What changed
   Added a validation to check if the record timestamp is in the future 
compared to the broker's timestamp and throw an exception to reject the record.
   
   The current validation for  checking the record's timestamp based on the 
configuered ```timestampDiffMaxMs``` remain unchanged. This new validation will 
take care of scenarios where producers are sending future timestamp for a 
record.
   Specific changes are:
   - Updated validation logic in LogValidator
   - Added Unit test coverage for the change
   - Update Unit tests that failed because of the new validation logic
   
   ### Why?
   https://issues.apache.org/jira/browse/KAFKA-14991
   Improves the accuracy of the log validation logic and avoids unexpected 
gotchas for customers
   
   
   ### Testing
   - Added relevant unit tests
   - Reproduced the issue by setting nonseconds instead of miliseconds in the 
producer logic and verified that validation is working as expected. Example API 
response
   
   ```
   
"responses":[{"name":"myTopic1","partitionResponses":[{"index":0,"errorCode":32,"baseOffset":-1,"logAppendTimeMs":-1,"logStartOffset":0,"recordErrors":[
   {"batchIndex":0,"batchIndexErrorMessage":"Timestamp 1755933141855875 of 
message with offset 0 is ahead of the server's time. 1683838582815"},
   {"batchIndex":1,"batchIndexErrorMessage":"Timestamp 1755933754530625 of 
message with offset 1 is ahead of the server's time. 1683838582815"},
   
   ],"errorMessage":"One or more records have been rejected due to invalid 
timestamp"}]}]
   ``` 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-11 Thread via GitHub


mehbey commented on PR #13681:
URL: https://github.com/apache/kafka/pull/13681#issuecomment-1544909428

   > 
   
   Thank you for the review @clolov - I will address the comments shortly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14992) Add partition limit or improve error msg for adminzkclient

2023-05-11 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-14992:


 Summary: Add partition limit or improve error msg for adminzkclient
 Key: KAFKA-14992
 URL: https://issues.apache.org/jira/browse/KAFKA-14992
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang


Create topic requests with large number of partitions will yield an exception 
that's technically unrelated and confusing. 
`partitions should be a consecutive 0-based integer sequence`
What really is happening is that we exceed maxInt at this line
[https://github.com/apache/kafka/blame/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L154]
which causes the following check to fail. We should account for this case 
better.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-05-11 Thread Bikash Adhikari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721935#comment-17721935
 ] 

Bikash Adhikari commented on KAFKA-14941:
-

I will work on this documentation. 

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] splett2 commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart

2023-05-11 Thread via GitHub


splett2 commented on code in PR #13707:
URL: https://github.com/apache/kafka/pull/13707#discussion_r1191702662


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1956,10 +1956,10 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val transactionTopicSegmentBytes = 
getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
   val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
   val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
-  
+
   val transactionPartitionVerificationEnable = 
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
 
-  val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+  def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)

Review Comment:
   The dynamic broker config code is quite hairy. I took a look and my 
high-level understanding what it does is the following:
   
   `DynamicBrokerConfig.updateCurrentConfig`: Try to generate a new 
`KafkaConfig` from the set of properties persisted to Zookeeper. If the current 
config is equal to the new config, no-op. Otherwise, determine the set of 
reconfigurables that need to be updated based on the currently registered set 
of reconfigurables, apply those updates. Then update the current config.
   
   I added some logging and it looks like what is happening is the following:
   1. During `KafkaServer.startup()` we call 
`config.dynamicConfig.initialize(Some(zkClient))`. At this point, the set of 
recconfigurables is empty.
   2. Many lines of code later, we call:
   ```
   /* Add all reconfigurables for config change notification before 
starting config handlers */
   config.dynamicConfig.addReconfigurables(this)
   
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
   ```
   3. Eventually we start up the config manager which tries to reload props 
from Zookeeper.
   
   Step #1 loads broker overrides from Zookeeper, but doesn't apply any changes 
since we have not added the reconfigurables yet. This means that the props just 
get applied to the current KafkaConfig, and the reconfiguration hooks defined 
in `DynamicBrokerConfig` don't fire. However, we do update the current 
KafkaConfig to include the updated props.
   Step #2 adds the reconfigurables so that post-startup configuration changes 
alter components.
   Step #3 tries to load from Zookeeper the base props, but because #1 has 
already updated the current KafkaConfig to match the existing ZK state, we 
no-op again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191678769


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group

Review Comment:
   oya yep mb



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


jeffkbkim commented on PR #13704:
URL: https://github.com/apache/kafka/pull/13704#issuecomment-1544658481

   i'm seeing 
   ```
   Class Data Abstraction Coupling is 31 (max allowed is 25) classes 
[ApiMessageAndVersion, ClientAssignor, ConsumerGroupCurrentMemberAssignmentKey, 
ConsumerGroupCurrentMemberAssignmentValue, 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions, 
ConsumerGroupMember.Builder, ConsumerGroupMemberMetadataKey, 
ConsumerGroupMemberMetadataValue, ConsumerGroupMemberMetadataValue.Assignor, 
ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, 
ConsumerGroupPartitionMetadataKey, ConsumerGroupPartitionMetadataValue, 
ConsumerGroupPartitionMetadataValue.TopicMetadata, 
ConsumerGroupTargetAssignmentMemberKey, 
ConsumerGroupTargetAssignmentMemberValue, 
ConsumerGroupTargetAssignmentMemberValue.TopicPartition, 
ConsumerGroupTargetAssignmentMetadataKey, 
ConsumerGroupTargetAssignmentMetadataValue, GenericGroup, GenericGroupMember, 
GroupMetadataKey, GroupMetadataValue, GroupMetadataValue.MemberMetadata, 
LinkedHashMap, LogContext, MockTime, Protocol, Record, TopicMetadata, 
VersionedMetadata]. [ClassData
 AbstractionCoupling]
   ```
   
   where this suggests from 
https://checkstyle.sourceforge.io/apidocs/com/puppycrawl/tools/checkstyle/checks/metrics/ClassDataAbstractionCouplingCheck.html#:~:text=Generally%20speaking%2C%20any%20data%20type,the%20structure%20of%20the%20class
   > Generally speaking, any data type with other data types as members or 
local variable that is an instantiation (object) of another class has data 
abstraction coupling (DAC). The higher the DAC, the more complex the structure 
of the class.
   
   Can we suppress this error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191665112


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 }
 
+private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   the sendOffsetCommitRequest method basically 1. if the offset is empty, 
complete the future and return immediately, 2. if there's no coordinator 
`checkAndGetCoordinator` then complete the future with exception (like 
completeExceptionally) and return, otherwise 3. create the request data and 
send it `client.send(coordinator, builder).compose(new 
OffsetCommitResponseHandler(offsets, generation));` 
   
   What I'm suggesting is not to change that method, but to change the 
`commitOffsetsSync`.  My suggestions are:
   
   Currently, it returns immediately if the offset is empty. But we don't want 
that, because we also want to check if there's any inflightAsyncCommits. 
   
   Now, if the we can't return immediately, we will need to send these commits, 
and the requirement is to `coordinatorUnknownAndUnreadySync`.  This check is 
already in place, so I think you just need to try to send the async commit 
after this check.
   
   So there's not very much code change there :) 
   
   The main concern me and David have is managing 2 atomic int, because at 
certain point one might forget to update one of it and causes some weird bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13708: KAFKA-14500; [4/N] Add purgatory interface

2023-05-11 Thread via GitHub


jeffkbkim opened a new pull request, #13708:
URL: https://github.com/apache/kafka/pull/13708

   Adds the purgatory interface that will be used by the new group coordinator.
* PurgatoryScheduler: used by GroupMetadataManager (introduced in 
https://github.com/apache/kafka/pull/13639) to add and remove operations
* RebalancePurgatory: used by the new runtime framework to complete and 
expire operations in the purgatory.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14991) Improving Producer's record timestamp validation

2023-05-11 Thread Mehari Beyene (Jira)
Mehari Beyene created KAFKA-14991:
-

 Summary: Improving Producer's record timestamp validation
 Key: KAFKA-14991
 URL: https://issues.apache.org/jira/browse/KAFKA-14991
 Project: Kafka
  Issue Type: Improvement
  Components: core, log
Reporter: Mehari Beyene


When time-based retention is configured, the timestamp provided by the producer 
is used by default to determine the retention period of the log. Customers have 
the option of changing the configuration to use the broker's timestamp by 
overriding the configuration for "log.message.timestamp.type", but by default, 
the producer's timestamp is used. The producer's record timestamp can be in the 
past or future. Kafka determines the retention time of the log by comparing the 
broker's time with the record's time.

Arguably, there can be use cases for a producer to send records with timestamps 
that are in the past (for example, for replaying old data), but it is 
inaccurate for records to have a timestamp that is far in the future compared 
to the broker's current time.

There is a configurable property called "message.timestamp.difference.max.ms" 
that customers can use to control the allowed time difference between the 
broker's current time and the record timestamp. However, the validation from 
the Kafka code side can be improved by rejecting records with future timestamps 
from being written in the first place.

Customers have run into this issue in the past where a producer is configured 
erroneously to set the record timestamp in nanoseconds instead of milliseconds, 
resulting in a record timestamp that is in the future, and the time-based 
retention policy did not kick in as expected.

The improvement I am proposing is to add basic validation in 
org.apache.kafka.storage.internals.log.LogValidator to reject record timestamps 
that are in the future compared to the broker current timestamp after 
accounting for a sensible tolerance for potential clock skew.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-11 Thread via GitHub


jolshan commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1544627294

   I forgot this also covers the race for end txn marker being written before 
the produce completes.
   (We reset verification status to empty on the marker, so a produce request 
will fail)
   
   Will add a test for this and include in description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191653253


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() {
 ));
 }
 
-@Test
-public void testNewGroupMetadataRecord() {
+@ParameterizedTest
+@MethodSource("metadataToExpectedGroupMetadataValue")

Review Comment:
   Ah. I see. I missed the version part. We can keep it as it is but could we 
move the method to just before this test? It is easier to read them together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart

2023-05-11 Thread via GitHub


splett2 commented on code in PR #13707:
URL: https://github.com/apache/kafka/pull/13707#discussion_r1191611324


##
core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala:
##
@@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends 
KafkaServerTestHarness {
 )
 
 // Update the expiration time to a low value again.
-admin.incrementalAlterConfigs(producerIdExpirationConfig("100"))
+
admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get()

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191604696


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() {
 ));
 }
 
-@Test
-public void testNewGroupMetadataRecord() {
+@ParameterizedTest
+@MethodSource("metadataToExpectedGroupMetadataValue")

Review Comment:
   the reason I used method source is to include the corresponding expected 
version. 
   
   if we use EnumSource we can add if/else statements but i thought the 
existing approach would be cleaner. what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191602652


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,1012 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191602126


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;

Review Comment:
   This is a bit awkward as the existing GroupMetadata updates this field when 
we read the group metadata record 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1225).
 
   
   So we should expect the new group metadata manager introduced in 
https://github.com/apache/kafka/pull/13639 to perform this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-11 Thread via GitHub


Hangleton commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1191598672


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0
+   * 2. largestOffset - baseOffset < 0
+   * 3. largestOffset - baseOffset > Integer.MAX_VALUE
+   */
+  @ParameterizedTest
+  @CsvSource(Array(
+"0, -2147483648",
+"0, 2147483648",
+"1, 0",
+"100, 10",
+"2147483648, 0",
+"-2147483648, 0",
+"2147483648,4294967296"
+  ))
+  def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, 
largestOffset: Long): Unit = {

Review Comment:
   I see, thanks for checking. Could we maybe confirm if the overflow exception 
comes from this line in `TimeIndex#maybeAppend`:
   
   ```
   mmap.putInt(relativeOffset(offset))
   ```
   
   And if that is the case, why is the relative offset < 0 or > max_int?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-11 Thread via GitHub


Hangleton commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1191598230


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0
+   * 2. largestOffset - baseOffset < 0
+   * 3. largestOffset - baseOffset > Integer.MAX_VALUE
+   */
+  @ParameterizedTest
+  @CsvSource(Array(
+"0, -2147483648",
+"0, 2147483648",
+"1, 0",
+"100, 10",
+"2147483648, 0",
+"-2147483648, 0",
+"2147483648,4294967296"
+  ))
+  def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, 
largestOffset: Long): Unit = {

Review Comment:
   I see, thanks for checking. Could we maybe confirm if the overflow exception 
comes from this line in `TimeIndex#maybeAppend`:
   
   ```
   mmap.putInt(relativeOffset(offset))
   ```
   
   And if that is the case, why is the relative offset < 0 or > max_int?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart

2023-05-11 Thread via GitHub


splett2 commented on code in PR #13707:
URL: https://github.com/apache/kafka/pull/13707#discussion_r1191586683


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1956,10 +1956,10 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val transactionTopicSegmentBytes = 
getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
   val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
   val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
-  
+
   val transactionPartitionVerificationEnable = 
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
 
-  val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+  def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)

Review Comment:
   This is a good question. It is probably worth making sure we have similar 
coverage for other dynamic broker configs. Looking at some other dynamic 
configs, eg: `LogCleanerThreadsProp`, I would expect them to run into similar 
issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart

2023-05-11 Thread via GitHub


jolshan commented on code in PR #13707:
URL: https://github.com/apache/kafka/pull/13707#discussion_r1191577885


##
core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala:
##
@@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends 
KafkaServerTestHarness {
 )
 
 // Update the expiration time to a low value again.
-admin.incrementalAlterConfigs(producerIdExpirationConfig("100"))
+
admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get()

Review Comment:
   Does the all + get ensure the call actually completes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart

2023-05-11 Thread via GitHub


jolshan commented on code in PR #13707:
URL: https://github.com/apache/kafka/pull/13707#discussion_r1191576738


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1956,10 +1956,10 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val transactionTopicSegmentBytes = 
getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
   val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
   val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
-  
+
   val transactionPartitionVerificationEnable = 
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
 
-  val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+  def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)

Review Comment:
   Are the other dynamic configs defs here? Just trying to figure out how this 
was missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 opened a new pull request, #13707: KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart

2023-05-11 Thread via GitHub


splett2 opened a new pull request, #13707:
URL: https://github.com/apache/kafka/pull/13707

   Dynamic overrides for the producer ID expiration config are not picked up on 
broker restart in Zookeeper mode. Based on the integration test, this does not 
apply to KRaft mode.
   
   Adds a broker restart that fails without the corresponding KafkaConfig 
change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14990) Dynamic producer ID expiration is not applied on broker restart

2023-05-11 Thread David Mao (Jira)
David Mao created KAFKA-14990:
-

 Summary: Dynamic producer ID expiration is not applied on broker 
restart
 Key: KAFKA-14990
 URL: https://issues.apache.org/jira/browse/KAFKA-14990
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hoangphuocbk commented on pull request #12371: KAFKA-14035: Fix NPE caused by missing null check in SnapshottableHashTable::mergeFrom()

2023-05-11 Thread via GitHub


hoangphuocbk commented on PR #12371:
URL: https://github.com/apache/kafka/pull/12371#issuecomment-1544342962

   @iblislin, I had same problem. Did your solution work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191453623


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -365,6 +365,20 @@ public short listOffsetRequestVersion() {
 }
 }
 
+public short groupMetadataValueVersion() {

Review Comment:
   nit: Should we add a unit test in MetadataVersionTest as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191452753


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -399,8 +403,12 @@ public void testNewCurrentAssignmentTombstoneRecord() {
 ));
 }
 
-@Test
-public void testNewGroupMetadataRecord() {
+@ParameterizedTest
+@MethodSource("metadataToExpectedGroupMetadataValue")

Review Comment:
   You could use: 
   ```
   @ParameterizedTest
   @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", 
"IBP_3_3_IV2"})
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191438825


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -383,4 +398,84 @@ public void testNewCurrentAssignmentTombstoneRecord() {
 "member-id"
 ));
 }
+
+@Test
+public void testNewGroupMetadataRecord() {
+Time time = new MockTime();
+
+List expectedMembers = new 
ArrayList<>();
+expectedMembers.add(
+new GroupMetadataValue.MemberMetadata()
+.setMemberId("member-1")
+.setClientId("client-1")
+.setClientHost("host-1")
+.setRebalanceTimeout(1000)
+.setSessionTimeout(1500)
+.setGroupInstanceId("group-instance-1")
+.setSubscription(new byte[]{0, 1})
+.setAssignment(new byte[]{1, 2})
+);
+
+expectedMembers.add(
+new GroupMetadataValue.MemberMetadata()
+.setMemberId("member-2")
+.setClientId("client-2")
+.setClientHost("host-2")
+.setRebalanceTimeout(1000)
+.setSessionTimeout(1500)
+.setGroupInstanceId("group-instance-2")
+.setSubscription(new byte[]{1, 2})
+.setAssignment(new byte[]{2, 3})
+);
+
+Record expectedRecord = new Record(
+new ApiMessageAndVersion(
+new GroupMetadataKey()
+.setGroup("group-id"),
+(short) 2),
+new ApiMessageAndVersion(
+new GroupMetadataValue()
+.setProtocol("range")
+.setProtocolType("consumer")
+.setLeader("member-1")
+.setGeneration(1)
+.setCurrentStateTimestamp(time.milliseconds())
+.setMembers(expectedMembers),
+(short) 3));
+
+GenericGroup group = new GenericGroup(
+new LogContext(),
+"group-id",
+GenericGroupState.PREPARING_REBALANCE,
+time
+);
+
+Map memberAssignments = new HashMap<>();
+expectedMembers.forEach(member -> {
+memberAssignments.put(member.memberId(), member.assignment());
+group.add(new GenericGroupMember(
+member.memberId(),
+Optional.of(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+member.rebalanceTimeout(),
+member.sessionTimeout(),
+"consumer",
+Collections.singletonList(new Protocol(
+"range",
+member.subscription()
+)),
+member.assignment()
+));
+});
+
+group.initNextGeneration();
+Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
+group,
+memberAssignments,
+MetadataVersion.IBP_3_5_IV2
+);
+
+assertEquals(expectedRecord, groupMetadataRecord);
+}
 }

Review Comment:
   i updated `testNewGroupMetadataRecord` for 3). can you take a look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

2023-05-11 Thread via GitHub


fvaleri commented on PR #13517:
URL: https://github.com/apache/kafka/pull/13517#issuecomment-1544315826

   @showuon thanks for keeping the time to review all these PRs. We should be 
good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

2023-05-11 Thread via GitHub


fvaleri commented on code in PR #13517:
URL: https://github.com/apache/kafka/pull/13517#discussion_r1191435518


##
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java:
##
@@ -16,29 +16,59 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.common.errors.TimeoutException;
-
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from 
topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in 
`Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify 
options - Save console output to file` to
+ * record all the log output together.
+ */
 public class KafkaConsumerProducerDemo {
-public static void main(String[] args) throws InterruptedException {
-boolean isAsync = args.length == 0 || 
!args[0].trim().equalsIgnoreCase("sync");
-CountDownLatch latch = new CountDownLatch(2);
-Producer producerThread = new Producer(
-"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 
1, -1, latch);
-producerThread.start();
-
-Consumer consumerThread = new Consumer(
-"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", 
Optional.empty(), false, 1, latch);
-consumerThread.start();
-
-if (!latch.await(5, TimeUnit.MINUTES)) {
-throw new TimeoutException("Timeout after 5 minutes waiting for 
demo producer and consumer to finish");
-}
+public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+public static final String TOPIC_NAME = "my-topic";
+public static final String GROUP_NAME = "my-group";
+
+public static void main(String[] args) {
+try {
+if (args.length == 0) {
+Utils.printHelp("This example takes 2 parameters (i.e. 1 
sync):%n" +
+"- records: total number of records to send (required)%n" +
+"- mode: pass 'sync' to send records synchronously 
(optional)");
+return;
+}
 
-consumerThread.shutdown();
-System.out.println("All finished!");
+int numRecords = Integer.parseInt(args[0]);
+boolean isAsync = args.length == 1 || 
!args[1].trim().equalsIgnoreCase("sync");
+
+// stage 1: clean any topics left from previous runs
+Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+CountDownLatch latch = new CountDownLatch(2);
+
+// stage 2: produce records to topic1
+Producer producerThread = new Producer(
+"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, 
false, numRecords, -1, latch);
+producerThread.start();
+
+// stage 3: consume records from topic1
+Consumer consumerThread = new Consumer(
+"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, 
Optional.empty(), false, numRecords, latch);
+consumerThread.start();
+
+if (!latch.await(5, TimeUnit.MINUTES)) {
+Utils.printErr("Timeout after 5 minutes waiting for 
termination");
+//producerThread.shutdown();
+consumerThread.shutdown();

Review Comment:
   Rebase issue, fixed.



##
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 5`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the 
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs 
-> Save console
- * output to file` to record 

[GitHub] [kafka] clolov commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


clolov commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191430711


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;

Review Comment:
   Is this necessary to be an Optional? As far as I see we immediately define 
it in the constructor and we never set it equal to something which is empty. Am 
I missing something obvious?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


clolov commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191430711


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;

Review Comment:
   Is this necessary to be an Optional? As far as I see we immediately define 
it in the constructor and we never set it equal to null. Am I missing something 
obvious?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes

2023-05-11 Thread via GitHub


satishd merged PR #13060:
URL: https://github.com/apache/kafka/pull/13060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes

2023-05-11 Thread via GitHub


satishd commented on PR #13060:
URL: https://github.com/apache/kafka/pull/13060#issuecomment-1544287256

   There were a couple of test failures that are unrelated to this PR, will 
merge to trunk. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
 this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
 this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
 this.interceptors = interceptors;
+this.inFlightAsyncCommits = new AtomicInteger();
 this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Thanks @dajac - Do you mean the code isn't changing the counter in the 
commit callback? I don't know the exact reason, but my guess is, the commit 
isn't sent until a coordinator is ready and it is therefore called "pending".  
If the coordinator is connected, then there's no point to increment and 
decrement the counter, because there will be a response anyway.
   
   I'm in supportive of the consolidating suggestions.  For me, it is mentally 
difficult to manage multiple different states correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
 this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
 this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
 this.interceptors = interceptors;
+this.inFlightAsyncCommits = new AtomicInteger();
 this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Thanks @dajac - Do you mean the code isn't changing the counter in the 
commit callback? I don't know the exact reason, but my guess is, the commit 
isn't sent until a coordinator is ready and it is therefore called "pending".  
If the coordinator is connected, then there's no point to increment and 
decrement the counter, but it will be done anyway, i.e. there will (most 
likely) be a response.
   
   I'm in supportive of the consolidating suggestions.  For me, it is mentally 
difficult to manage multiple different states correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig 
rebalanceConfig,
 this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
 this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
 this.interceptors = interceptors;
+this.inFlightAsyncCommits = new AtomicInteger();
 this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Thanks @dajac - Do you mean the code isn't changing the counter in the 
commit callback? I don't know the exact reason, but my guess is, the commit 
isn't sent until a coordinator is ready and it is therefore called "pending".  
If the coordinator exist, then there's no point to increment and decrement the 
counter, but it will be done anyway, i.e. there will (most likely) be a 
response.
   
   I'm in supportive of the consolidating suggestions.  For me, it is mentally 
difficult to manage multiple different states correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191395592


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
 );
 }
 
+/**
+ * Creates a GroupMetadata record.
+ *
+ * @param group  The generic group.
+ * @param memberAssignments  The assignment by member id.
+ * @param metadataVersionThe metadata version.
+ * @return The record.
+ */
+public static Record newGroupMetadataRecord(
+GenericGroup group,
+Map memberAssignments,

Review Comment:
   hmm... don't we store it in MemberMetadata in the current implementation? We 
set it in `setAndPropagateAssignment`. I think that we need it because we need 
the ability to provide it the the member at any time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat merged pull request #13706: MINOR: clean up unused methods in core utils

2023-05-11 Thread via GitHub


jlprat merged PR #13706:
URL: https://github.com/apache/kafka/pull/13706


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191390607


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
 );
 }
 
+/**
+ * Creates a GroupMetadata record.
+ *
+ * @param group  The generic group.
+ * @param memberAssignments  The assignment by member id.
+ * @param metadataVersionThe metadata version.
+ * @return The record.
+ */
+public static Record newGroupMetadataRecord(
+GenericGroup group,
+Map memberAssignments,

Review Comment:
   the assignment comes directly from the sync group request, i don't think we 
need to store the assignment inside the group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191387973


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 }
 
+private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   could you elaborate `logic will spread around and understandability will 
suffer a lot` : I was suggesting to first check if there's any inflight 
commits, then wait for the coordinator to be connected.  After the connection 
is established, you can try to send the async commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #13706: MINOR: clean up unused methods in core utils

2023-05-11 Thread via GitHub


jlprat commented on PR #13706:
URL: https://github.com/apache/kafka/pull/13706#issuecomment-1544246745

   All test failures seem to be unrelated to the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


dajac commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191381128


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,1012 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191374954


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericGroupTest {
+private final String protocolType = "consumer";
+private final String groupInstanceId = "groupInstanceId";
+private final String memberId = "memberId";
+private final String clientId = "clientId";
+private final String clientHost = "clientHost";
+private final int rebalanceTimeoutMs = 6;
+private final int sessionTimeoutMs = 1;
+
+
+private GenericGroup group = null;
+
+@BeforeEach
+public void initialize() {
+group = new GenericGroup(new LogContext(), "groupId", EMPTY, 
Time.SYSTEM);
+}
+
+@Test
+public void testCanRebalanceWhenStable() {
+assertTrue(group.canRebalance());
+}
+
+@Test
+public void testCanRebalanceWhenCompletingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+assertTrue(group.canRebalance()); 
+}
+
+@Test
+public void testCannotRebalanceWhenPreparingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testCannotRebalanceWhenDead() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+group.transitionTo(DEAD);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testStableToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testStableToDeadTransition() {
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testAwaitingRebalanceToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testPreparingRebalanceToDeadTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testPreparingRebalanceToEmptyTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+assertState(group, EMPTY);
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191359030


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,935 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericGroupTest {
+private final String protocolType = "consumer";
+private final String groupInstanceId = "groupInstanceId";
+private final String memberId = "memberId";
+private final String clientId = "clientId";
+private final String clientHost = "clientHost";
+private final int rebalanceTimeoutMs = 6;
+private final int sessionTimeoutMs = 1;
+
+
+private GenericGroup group = null;
+
+@BeforeEach
+public void initialize() {
+group = new GenericGroup(new LogContext(), "groupId", EMPTY, 
Time.SYSTEM);
+}
+
+@Test
+public void testCanRebalanceWhenStable() {
+assertTrue(group.canRebalance());
+}
+
+@Test
+public void testCanRebalanceWhenCompletingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+assertTrue(group.canRebalance()); 
+}
+
+@Test
+public void testCannotRebalanceWhenPreparingRebalance() {
+group.transitionTo(PREPARING_REBALANCE);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testCannotRebalanceWhenDead() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+group.transitionTo(DEAD);
+assertFalse(group.canRebalance());
+}
+
+@Test
+public void testStableToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testStableToDeadTransition() {
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testAwaitingRebalanceToPreparingRebalanceTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+group.transitionTo(PREPARING_REBALANCE);
+assertState(group, PREPARING_REBALANCE);
+}
+
+@Test
+public void testPreparingRebalanceToDeadTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(DEAD);
+assertState(group, DEAD);
+}
+
+@Test
+public void testPreparingRebalanceToEmptyTransition() {
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+assertState(group, EMPTY);
+

[jira] [Updated] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics

2023-05-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14978:
--
Fix Version/s: 3.4.1

> ExactlyOnceWorkerSourceTask does not remove parent metrics
> --
>
> Key: KAFKA-14978
> URL: https://issues.apache.org/jira/browse/KAFKA-14978
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
> Fix For: 3.4.1, 3.6.0
>
>
> ExactlyOnceWorkerSourceTask removeMetrics does not invoke 
> super.removeMetrics, meaning that only the transactional metrics are removed, 
> and common source task metrics are not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191340188


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map 

[GitHub] [kafka] kirktrue commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-05-11 Thread via GitHub


kirktrue commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1191281659


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1383,9 +1386,13 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Serializer
  * can be used (the partition is then calculated by built-in
  * partitioning logic).
  */
-private int partition(ProducerRecord record, byte[] serializedKey, 
byte[] serializedValue, Cluster cluster) {

Review Comment:
   Lest my eyes deceive me, this is just formatting, correct?



##
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##
@@ -31,13 +34,27 @@ public interface Partitioner extends Configurable, 
Closeable {
  *
  * @param topic The topic name
  * @param key The key to partition on (or null if no key)
- * @param keyBytes The serialized key to partition on( or null if no key)
+ * @param keyBytes The serialized key to partition on(or null if no key)

Review Comment:
   nit: Can we change:
   
   `The serialized key to partition on(or null if no key)`
   
   to
   
   `The serialized key to partition on (or null if no key)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191303709


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.Group;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group where the
+ * member assignment is driven solely from the client side.
+ *
+ * The APIs members use to make changes to the group membership
+ * consist of JoinGroup, SyncGroup, and LeaveGroup.
+ */
+public class GenericGroup implements Group {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with a client id or a group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members awaiting a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to the number of members that support them.
+ */
+private final Map 

[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-11 Thread via GitHub


lanshiqin commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1191256878


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0

Review Comment:
   Thanks for your advice, I have revised the description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-11 Thread via GitHub


lanshiqin commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1191252780


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0
+   * 2. largestOffset - baseOffset < 0
+   * 3. largestOffset - baseOffset > Integer.MAX_VALUE
+   */
+  @ParameterizedTest
+  @CsvSource(Array(
+"0, -2147483648",
+"0, 2147483648",
+"1, 0",
+"100, 10",
+"2147483648, 0",
+"-2147483648, 0",
+"2147483648,4294967296"

Review Comment:
   Yes, it was intentionally written as 2147483648. The test method and 
LogSegment receive offset arguments of type Long in order to test scenarios 
exceeding Int.MAX_VALUE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-11 Thread via GitHub


lanshiqin commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1191248219


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0
+   * 2. largestOffset - baseOffset < 0
+   * 3. largestOffset - baseOffset > Integer.MAX_VALUE
+   */
+  @ParameterizedTest
+  @CsvSource(Array(
+"0, -2147483648",
+"0, 2147483648",
+"1, 0",
+"100, 10",
+"2147483648, 0",
+"-2147483648, 0",
+"2147483648,4294967296"
+  ))
+  def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, 
largestOffset: Long): Unit = {

Review Comment:
   When baseOffset == largestOffset, the teardown method in the test class is 
called at the end of the test, which in turn calls the logSegment _.close() => 
timeIndex.maybeAppend, Eventually an exception will be thrown (Integer overflow 
for offset: 0), but this exception is not the point that this test method needs 
to cover



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-11 Thread via GitHub


dajac commented on code in PR #13704:
URL: https://github.com/apache/kafka/pull/13704#discussion_r1191225920


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
 );
 }
 
+/**
+ * Creates a GroupMetadata record.
+ *
+ * @param group  The generic group.
+ * @param memberAssignments  The assignment by member id.
+ * @param metadataVersionThe metadata version.
+ * @return The record.
+ */
+public static Record newGroupMetadataRecord(
+GenericGroup group,
+Map memberAssignments,

Review Comment:
   Any reason why we can't get the assignments from the group?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -357,6 +366,94 @@ public static Record newCurrentAssignmentTombstoneRecord(
 );
 }
 
+/**
+ * Creates a GroupMetadata record.
+ *
+ * @param group  The generic group.
+ * @param memberAssignments  The assignment by member id.
+ * @param metadataVersionThe metadata version.
+ * @return The record.
+ */
+public static Record newGroupMetadataRecord(
+GenericGroup group,
+Map memberAssignments,
+MetadataVersion metadataVersion
+) {
+short version;
+if (metadataVersion.isLessThan(IBP_0_10_1_IV0)) {
+version = (short) 0;
+} else if (metadataVersion.isLessThan(IBP_2_1_IV0)) {
+version = (short) 1;
+} else if (metadataVersion.isLessThan(IBP_2_3_IV0)) {
+version = (short) 2;
+} else {
+version = (short) 3;

Review Comment:
   nit: Should we keep that comment?
   
   ```
 // Serialize with the highest supported non-flexible version
 // until a tagged field is introduced or the version is bumped.
   ```



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -383,4 +398,84 @@ public void testNewCurrentAssignmentTombstoneRecord() {
 "member-id"
 ));
 }
+
+@Test
+public void testNewGroupMetadataRecord() {
+Time time = new MockTime();
+
+List expectedMembers = new 
ArrayList<>();
+expectedMembers.add(
+new GroupMetadataValue.MemberMetadata()
+.setMemberId("member-1")
+.setClientId("client-1")
+.setClientHost("host-1")
+.setRebalanceTimeout(1000)
+.setSessionTimeout(1500)
+.setGroupInstanceId("group-instance-1")
+.setSubscription(new byte[]{0, 1})
+.setAssignment(new byte[]{1, 2})
+);
+
+expectedMembers.add(
+new GroupMetadataValue.MemberMetadata()
+.setMemberId("member-2")
+.setClientId("client-2")
+.setClientHost("host-2")
+.setRebalanceTimeout(1000)
+.setSessionTimeout(1500)
+.setGroupInstanceId("group-instance-2")
+.setSubscription(new byte[]{1, 2})
+.setAssignment(new byte[]{2, 3})
+);
+
+Record expectedRecord = new Record(
+new ApiMessageAndVersion(
+new GroupMetadataKey()
+.setGroup("group-id"),
+(short) 2),
+new ApiMessageAndVersion(
+new GroupMetadataValue()
+.setProtocol("range")
+.setProtocolType("consumer")
+.setLeader("member-1")
+.setGeneration(1)
+.setCurrentStateTimestamp(time.milliseconds())
+.setMembers(expectedMembers),
+(short) 3));
+
+GenericGroup group = new GenericGroup(
+new LogContext(),
+"group-id",
+GenericGroupState.PREPARING_REBALANCE,
+time
+);
+
+Map memberAssignments = new HashMap<>();
+expectedMembers.forEach(member -> {
+memberAssignments.put(member.memberId(), member.assignment());
+group.add(new GenericGroupMember(
+member.memberId(),
+Optional.of(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+member.rebalanceTimeout(),
+member.sessionTimeout(),
+"consumer",
+Collections.singletonList(new Protocol(
+"range",
+member.subscription()
+)),
+member.assignment()
+));
+});
+
+group.initNextGeneration();
+Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
+group,
+memberAssignments,
+   

[GitHub] [kafka] clolov commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-11 Thread via GitHub


clolov commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1191109013


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
 
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
 
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
 taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-.andStubReturn(task01Converted);
-activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-expectLastCall().once();
+when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), 
Mockito.eq(taskId01Partitions),

Review Comment:
   ```suggestion
   when(activeTaskCreator.createActiveTaskFromStandby(task01, 
taskId01Partitions,
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -879,13 +880,10 @@ public void shouldCloseTasksRemovedFromStateUpdater() {
 
when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true);
 
when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true);
 taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-expectLastCall().once();
-replay(activeTaskCreator);
 
 taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-verify(activeTaskCreator);
+Mockito.verify(activeTaskCreator, 
times(1)).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   ```suggestion
   
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -960,10 +958,8 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
 when(stateUpdater.drainRemovedTasks())
 .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, 
taskToUpdateInputPartitions));
 when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-
expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), 
eq(taskId01Partitions), eq(consumer)))
-.andStubReturn(convertedTask1);
-activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-expectLastCall().times(2);
+
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), 
Mockito.eq(taskId01Partitions),

Review Comment:
   ```suggestion
   when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, 
taskId01Partitions,
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
 
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
 
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
 taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-.andStubReturn(task01Converted);
-activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-expectLastCall().once();
+when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), 
Mockito.eq(taskId01Partitions),
+Mockito.eq(consumer))).thenReturn(task01Converted);
 expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), 
eq(taskId00Partitions)))
 .andStubReturn(task00Converted);
-replay(activeTaskCreator, standbyTaskCreator);
+replay(standbyTaskCreator);
 
 taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-verify(activeTaskCreator, standbyTaskCreator);
+verify(standbyTaskCreator);
+Mockito.verify(activeTaskCreator, 
times(1)).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   ```suggestion
   
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
   ```
   The default check of verify is times(1) as far as I know.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2162,11 +2140,12 @@ public Map 
prepareCommit() {
 
 // handleAssignment
 
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
-

[GitHub] [kafka] dajac commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-11 Thread via GitHub


dajac commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1191160971


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+ * 

[GitHub] [kafka] dengziming commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-11 Thread via GitHub


dengziming commented on PR #13679:
URL: https://github.com/apache/kafka/pull/13679#issuecomment-1543964126

   I rerun the failed tests locally and they work correctly, I create a JIRA to 
track this problem.
   https://issues.apache.org/jira/browse/KAFKA-14989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14989) Flaky test TransactionsTest.testFailureToFenceEpoch

2023-05-11 Thread Deng Ziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721762#comment-17721762
 ] 

Deng Ziming commented on KAFKA-14989:
-

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13679/6/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testFailureToFenceEpoch_String__quorum_kraft/

> Flaky test TransactionsTest.testFailureToFenceEpoch
> ---
>
> Key: KAFKA-14989
> URL: https://issues.apache.org/jira/browse/KAFKA-14989
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Deng Ziming
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14989) Flaky test TransactionsTest.testFailureToFenceEpoch

2023-05-11 Thread Deng Ziming (Jira)
Deng Ziming created KAFKA-14989:
---

 Summary: Flaky test TransactionsTest.testFailureToFenceEpoch
 Key: KAFKA-14989
 URL: https://issues.apache.org/jira/browse/KAFKA-14989
 Project: Kafka
  Issue Type: Improvement
Reporter: Deng Ziming






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jlprat commented on pull request #13706: MINOR: clean up unused methods in core utils

2023-05-11 Thread via GitHub


jlprat commented on PR #13706:
URL: https://github.com/apache/kafka/pull/13706#issuecomment-1543873607

   Thanks @machi1990 I fixed the typo as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13694: MINOR: add test tag for testDeadToDeadIllegalTransition

2023-05-11 Thread via GitHub


dajac merged PR #13694:
URL: https://github.com/apache/kafka/pull/13694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2023-05-11 Thread via GitHub


machi1990 commented on PR #12045:
URL: https://github.com/apache/kafka/pull/12045#issuecomment-1543816986

   > @machi1990 since you opened a PR to fix this flaky test, you might be 
familiar with this part of code. May I request you to review this PR please.
   
   Hey @divijvaidya I am new to Kafka and to this part of the code. It'll be 
good to get another round of reviews from committers since some of them have 
started to have a look at this PR. My attempt to fix the the flaky test in 
https://github.com/apache/kafka/pull/13702 was by slightly modifying the 
assertions which was more of having a quick win and stabilize the test. While 
this PR attempts to sort out the underlying issue with quota computation. I 
think it'll be good to get more eyes on the PR as suggested by 
https://github.com/apache/kafka/pull/12045#pullrequestreview-1004087364 and  
https://github.com/apache/kafka/pull/12045#pullrequestreview-959047597 what do 
you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13701: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …

2023-05-11 Thread via GitHub


viktorsomogyi commented on PR #13701:
URL: https://github.com/apache/kafka/pull/13701#issuecomment-1543791889

   @vamossagar12 yes, trunk and backports to 3.5 and 3.4 have been merged 
already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi merged pull request #13699: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …

2023-05-11 Thread via GitHub


viktorsomogyi merged PR #13699:
URL: https://github.com/apache/kafka/pull/13699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi merged pull request #13698: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …

2023-05-11 Thread via GitHub


viktorsomogyi merged PR #13698:
URL: https://github.com/apache/kafka/pull/13698


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-11 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1190991070


##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {

Review Comment:
   Should we add a test for this use case:
   ```
   minOneMessage = false
   hardMaxBytesLimit = false
   firstBatchSize > maxBytes?
   ```
   or a combination thereof?
   
   Maybe, please let me know if I can help with a few unit tests. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete

2023-05-11 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich updated KAFKA-14975:
---
Parent: KAFKA-7739
Issue Type: Sub-task  (was: Task)

> Make TopicBasedRemoteLogMetadataManager methods wait for initialize to 
> complete
> ---
>
> Key: KAFKA-14975
> URL: https://issues.apache.org/jira/browse/KAFKA-14975
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Matthew de Detrich
>Assignee: Matthew de Detrich
>Priority: Major
>
> In the current implementation of TopicBasedRemoteLogMetadataManager various 
> methods internally call the 
> ensureInitializedAndNotClosed to ensure that the 
> TopicBasedRemoteLogMetadataManager is initialized. If 
> TopicBasedRemoteLogMetadataManager is not initialized then an exception will 
> be thrown.
> This is not an ideal behaviour, rather than just throwing an exception we 
> should instead try to wait until TopicBasedRemoteLogMetadataManager is 
> initialised (with a timeout). This is what the expected behaviour from users 
> should be and its also what other parts of Kafka that use plugin based 
> systems (ergo kafka connect) do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

2023-05-11 Thread via GitHub


showuon commented on code in PR #13517:
URL: https://github.com/apache/kafka/pull/13517#discussion_r1190888533


##
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java:
##
@@ -16,29 +16,59 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.common.errors.TimeoutException;
-
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from 
topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in 
`Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify 
options - Save console output to file` to
+ * record all the log output together.
+ */
 public class KafkaConsumerProducerDemo {
-public static void main(String[] args) throws InterruptedException {
-boolean isAsync = args.length == 0 || 
!args[0].trim().equalsIgnoreCase("sync");
-CountDownLatch latch = new CountDownLatch(2);
-Producer producerThread = new Producer(
-"producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 
1, -1, latch);
-producerThread.start();
-
-Consumer consumerThread = new Consumer(
-"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", 
Optional.empty(), false, 1, latch);
-consumerThread.start();
-
-if (!latch.await(5, TimeUnit.MINUTES)) {
-throw new TimeoutException("Timeout after 5 minutes waiting for 
demo producer and consumer to finish");
-}
+public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+public static final String TOPIC_NAME = "my-topic";
+public static final String GROUP_NAME = "my-group";
+
+public static void main(String[] args) {
+try {
+if (args.length == 0) {
+Utils.printHelp("This example takes 2 parameters (i.e. 1 
sync):%n" +
+"- records: total number of records to send (required)%n" +
+"- mode: pass 'sync' to send records synchronously 
(optional)");
+return;
+}
 
-consumerThread.shutdown();
-System.out.println("All finished!");
+int numRecords = Integer.parseInt(args[0]);
+boolean isAsync = args.length == 1 || 
!args[1].trim().equalsIgnoreCase("sync");
+
+// stage 1: clean any topics left from previous runs
+Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+CountDownLatch latch = new CountDownLatch(2);
+
+// stage 2: produce records to topic1
+Producer producerThread = new Producer(
+"producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, 
false, numRecords, -1, latch);
+producerThread.start();
+
+// stage 3: consume records from topic1
+Consumer consumerThread = new Consumer(
+"consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, 
Optional.empty(), false, numRecords, latch);
+consumerThread.start();
+
+if (!latch.await(5, TimeUnit.MINUTES)) {
+Utils.printErr("Timeout after 5 minutes waiting for 
termination");
+//producerThread.shutdown();
+consumerThread.shutdown();

Review Comment:
   Why does it get commented out?



##
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 5`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the 
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs 
-> Save console
- * output to file` 

[GitHub] [kafka] akatona84 commented on pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-05-11 Thread via GitHub


akatona84 commented on PR #11565:
URL: https://github.com/apache/kafka/pull/11565#issuecomment-1543645307

   thx. rebased again and did som adjustments. (this time I built it and ran my 
modified unit tests as well :) )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

2023-05-11 Thread via GitHub


showuon merged PR #13516:
URL: https://github.com/apache/kafka/pull/13516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

2023-05-11 Thread via GitHub


showuon commented on PR #13516:
URL: https://github.com/apache/kafka/pull/13516#issuecomment-1543640638

   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testMultipleServerMechanisms()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testRestartReplication()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-Combined, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, 
Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

2023-05-11 Thread via GitHub


showuon commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1190880657


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+try {
+// it is recommended to have a relatively short txn timeout in 
order to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+KafkaProducer producer =
+new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
+
+// consumer must be in read_committed mode, which means it won't 
be able to read uncommitted data
+boolean readCommitted = true;
+KafkaConsumer consumer = new Consumer(
+"processor-consumer", bootstrapServers, inputTopic, 
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+.createKafkaConsumer();
+
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
-
-Map offsets = 
consumerOffsets();
-
-// Checkpoint the progress by sending offsets to group 
coordinator broker.
-// Note that this API is only available for broker >= 2.5.
-producer.sendOffsetsToTransaction(offsets, 
consumer.groupMetadata());
-
-// Finish the transaction. All sent records should be 
visible for consumption now.
-producer.commitTransaction();
-messageProcessed += records.count();
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {

Review Comment:
   Good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

[GitHub] [kafka] mimaison merged pull request #13170: MINOR: Remove unused methods in CoreUtils

2023-05-11 Thread via GitHub


mimaison merged PR #13170:
URL: https://github.com/apache/kafka/pull/13170


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AB027PS commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-11 Thread via GitHub


AB027PS commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1543611485

   I work with Martin Pelak and he suggested that I add my change to this PR, 
but my change is unrelated to the rest of this PR. So this doesn't address the 
errors mentioned in the first comment if that's what you're asking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2023-05-11 Thread Mahesh Madushan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721696#comment-17721696
 ] 

Mahesh Madushan commented on KAFKA-13349:
-

Hello [~guozhang] . While I'm looking into this issue , and I'm new to the 
codebase and wondering that the range iterator mention here is a 
KeyValueIterator and if we want to support remove, we should implement remove 
method in all implementation of the KeyValueIterator? is my thought process is 
correct ? Thank You.

> Allow Iterator.remove on KeyValueIterator
> -
>
> Key: KAFKA-13349
> URL: https://issues.apache.org/jira/browse/KAFKA-13349
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie++
>
> Today Stream's state store's range iterator does not support `remove`. We 
> could consider adding such support for all the built-in state stores:
> * RocksDB's native iterator does not support removal, but we can always do a 
> delete(key) concurrently while the iterator is open on the snapshot.
> * In-Memory: straight forward implementation.
> The benefit of that is then for range-and-delete truncation operation we do 
> not necessarily have to be cautious about concurrent modification exceptions. 
> This could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jlprat commented on pull request #13706: MINOR: clean up unused methods in core utils

2023-05-11 Thread via GitHub


jlprat commented on PR #13706:
URL: https://github.com/apache/kafka/pull/13706#issuecomment-1543608786

   @mimaison I took inspiration from your PR to clean up some more unused 
methods


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13170: MINOR: Remove unused methods in CoreUtils

2023-05-11 Thread via GitHub


jlprat commented on code in PR #13170:
URL: https://github.com/apache/kafka/pull/13170#discussion_r1190857226


##
core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala:
##
@@ -223,7 +204,7 @@ class CoreUtilsTest extends Logging {
 val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
 implicit val executionContext = 
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))

Review Comment:
   Looks good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat opened a new pull request, #13706: MINOR: clean up unused methods in core utils

2023-05-11 Thread via GitHub


jlprat opened a new pull request, #13706:
URL: https://github.com/apache/kafka/pull/13706

   Inspired by https://github.com/apache/kafka/pull/13170 I decided to check 
some other core utils classes and remove some unused methods.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13170: MINOR: Remove unused methods in CoreUtils

2023-05-11 Thread via GitHub


mimaison commented on code in PR #13170:
URL: https://github.com/apache/kafka/pull/13170#discussion_r1190855504


##
core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala:
##
@@ -223,7 +204,7 @@ class CoreUtilsTest extends Logging {
 val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
 implicit val executionContext = 
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))

Review Comment:
   I've add to use `ExecutionContextExecutorService` instead of 
`ExecutionContext` because we call `shutdownNow()` below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944

2023-05-11 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14988:
-
Fix Version/s: 3.6.0

> Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
> -
>
> Key: KAFKA-14988
> URL: https://issues.apache.org/jira/browse/KAFKA-14988
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
> Fix For: 3.6.0
>
>
> Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from 
> a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]
>  
> The CVE does not impact Kafka as per 
> https://issues.apache.org/jira/browse/KAFKA-14267  and is fixed in 
> ScalaCollectionCompact v2.9 as per 
> [https://github.com/scala/scala-collection-compat/pull/569] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944

2023-05-11 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14988:
-
Description: 
Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from a 
critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]

 

The CVE does not impact Kafka as per 
https://issues.apache.org/jira/browse/KAFKA-14267  (hence, not marking this as 
critical) and is fixed in ScalaCollectionCompact v2.9 as per 
[https://github.com/scala/scala-collection-compat/pull/569] 

  was:
Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from a 
critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]

 

The CVE does not impact Kafka as per 
https://issues.apache.org/jira/browse/KAFKA-14267  and is fixed in 
ScalaCollectionCompact v2.9 as per 
[https://github.com/scala/scala-collection-compat/pull/569] 


> Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
> -
>
> Key: KAFKA-14988
> URL: https://issues.apache.org/jira/browse/KAFKA-14988
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
> Fix For: 3.6.0
>
>
> Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from 
> a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]
>  
> The CVE does not impact Kafka as per 
> https://issues.apache.org/jira/browse/KAFKA-14267  (hence, not marking this 
> as critical) and is fixed in ScalaCollectionCompact v2.9 as per 
> [https://github.com/scala/scala-collection-compat/pull/569] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944

2023-05-11 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721691#comment-17721691
 ] 

Divij Vaidya edited comment on KAFKA-14988 at 5/11/23 8:46 AM:
---

[https://github.com/apache/kafka/pull/13673] 


was (Author: divijvaidya):
Fixed by [https://github.com/apache/kafka/pull/13673] 

> Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
> -
>
> Key: KAFKA-14988
> URL: https://issues.apache.org/jira/browse/KAFKA-14988
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>
> Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from 
> a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]
>  
> The CVE does not impact Kafka as per 
> https://issues.apache.org/jira/browse/KAFKA-14267  and is fixed in 
> ScalaCollectionCompact v2.9 as per 
> [https://github.com/scala/scala-collection-compat/pull/569] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944

2023-05-11 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721691#comment-17721691
 ] 

Divij Vaidya commented on KAFKA-14988:
--

Fixed by [https://github.com/apache/kafka/pull/13673] 

> Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
> -
>
> Key: KAFKA-14988
> URL: https://issues.apache.org/jira/browse/KAFKA-14988
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>
> Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from 
> a critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]
>  
> The CVE does not impact Kafka as per 
> https://issues.apache.org/jira/browse/KAFKA-14267  and is fixed in 
> ScalaCollectionCompact v2.9 as per 
> [https://github.com/scala/scala-collection-compat/pull/569] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-11 Thread via GitHub


Hangleton commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1543590907

   Hi Julius (@AB027PS) - just to clarify, this isn't the same as the 
`AccessDeniedException`  addressed by this CR right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2023-05-11 Thread via GitHub


divijvaidya commented on PR #12045:
URL: https://github.com/apache/kafka/pull/12045#issuecomment-1543589903

   @machi1990 since you opened a PR to fix this flaky test, you might be 
familiar with this part of code. May I request you to review this PR please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14988) Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944

2023-05-11 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14988:


 Summary: Upgrade scalaCollectionCompact to v2.9 for CVE-2022-36944
 Key: KAFKA-14988
 URL: https://issues.apache.org/jira/browse/KAFKA-14988
 Project: Kafka
  Issue Type: Improvement
Reporter: Divij Vaidya


Current version of ScalaCollectionCompact library in trunk 2.6.0 suffers from a 
critical [CVE-2022-36944|https://github.com/advisories/GHSA-8qv5-68g4-248j]

 

The CVE does not impact Kafka as per 
https://issues.apache.org/jira/browse/KAFKA-14267  and is fixed in 
ScalaCollectionCompact v2.9 as per 
[https://github.com/scala/scala-collection-compat/pull/569] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] AB027PS commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-11 Thread via GitHub


AB027PS commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1543556383

   @Hangleton @cmccabe I encountered a bug while using KRaft where Kafka would 
mark .checkpoint files in the _cluster-metadata_0 folder for deletion and then 
would fail to delete them because they are read-only. The root cause is that it 
tried to do it using the Files.deleteIfExists method, which can't delete 
read-only files on Windows. I made changes to KafkaMetadataLog.scala and 
Snapshots.java where I created a new method for this purpose.
   path.toFile().setWritable(true) works if there is no file at the path on 
Windows, but I'm not sure how it behaves on Linux, so I can create a new unit 
test for it or put it behind an isOsWindows check. Should I create a new PR for 
this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190794344


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1145,6 +1150,7 @@ public void onFailure(RuntimeException e) {
 if (commitException instanceof FencedInstanceIdException) {
 asyncCommitFenced.set(true);
 }
+inFlightAsyncCommits.decrementAndGet();

Review Comment:
   I think it is not needed here. There is nothing that could fail with an 
exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-11 Thread via GitHub


erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190791004


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 }
 
+private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   Renamed the method in 1fa48f53f0e6f5a2a9821075fa053e01cba6b0b2.
   
   Also, see this comment: 
https://github.com/apache/kafka/pull/13678#discussion_r1190730213



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >