[GitHub] [kafka] dajac merged pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface

2023-03-07 Thread via GitHub


dajac merged PR #13357:
URL: https://github.com/apache/kafka/pull/13357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface

2023-03-07 Thread via GitHub


dajac commented on PR #13357:
URL: https://github.com/apache/kafka/pull/13357#issuecomment-1459679650

   > Sorry if I missed it somewhere/forgot, but is this saying we will only 
allow the new group coordinator for kraft? Or do we have an alternative for ZK?
   
   That's right. At the moment, we focus on kraft support. Given our timeline 
for the implementation, it seems that implementing the new protocol in ZK is 
not worth it because we will remove it shortly after. We can still do it if we 
need to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-07 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1129066723


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1352,8 +1370,22 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture futu
 Set unauthorizedTopics = new HashSet<>();
 
 for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+String topicName = topic.name();
+
+if (commitResponse.version() >= 9) {

Review Comment:
   @Hangleton I had a deeper look into this and it seems that we could get the 
version with `this.response.requestHeader().apiVersion()`. Could you check if 
this would work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rohits64 opened a new pull request, #13361: KAFKA-14401: Resume WorkThread if Connector/Tasks reading offsets get stuck when underneath WorkThread dies

2023-03-07 Thread via GitHub


rohits64 opened a new pull request, #13361:
URL: https://github.com/apache/kafka/pull/13361

   Here WorkThread dies if any unexpected exception is encountered. Resumed the 
WorkThread by creating another thread and offset reading continues. Added test 
for this, although the test can be included with other test too if separate 
test feels like overkill.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support

2023-03-07 Thread Ganesh Sahu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697723#comment-17697723
 ] 

Ganesh Sahu commented on KAFKA-14218:
-

Thanks [~showuon]  for the prompt reply

> replace temp file handler with JUnit 5 Temporary Directory Support
> --
>
> Key: KAFKA-14218
> URL: https://issues.apache.org/jira/browse/KAFKA-14218
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Luke Chen
>Assignee: Ganesh Sahu
>Priority: Major
>  Labels: Newbie, newbie
>
> We created many temp files in tests, and sometimes we forgot to delete them 
> after usage. Instead of polluting @AfterEach for each test, we should 
> consider to use JUnit 5 TempDirectory Extension.
>  
> REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431]
> 2. [https://www.baeldung.com/junit-5-temporary-directory]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2023-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4969:
--

Assignee: Bill Bejeck

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.6.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
>  - in the case of standby tasks, which tasks have progressed the most with 
> respect to restoration
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.
>  
> There have been some additional discussions around task assignment on a 
> related PR https://github.com/apache/kafka/pull/5390



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support

2023-03-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697705#comment-17697705
 ] 

Luke Chen commented on KAFKA-14218:
---

No need to raise a KIP. You can start with some tests to see if it works 
better, and open a PR for early review. Thanks.

> replace temp file handler with JUnit 5 Temporary Directory Support
> --
>
> Key: KAFKA-14218
> URL: https://issues.apache.org/jira/browse/KAFKA-14218
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Luke Chen
>Assignee: Ganesh Sahu
>Priority: Major
>  Labels: Newbie, newbie
>
> We created many temp files in tests, and sometimes we forgot to delete them 
> after usage. Instead of polluting @AfterEach for each test, we should 
> consider to use JUnit 5 TempDirectory Extension.
>  
> REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431]
> 2. [https://www.baeldung.com/junit-5-temporary-directory]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support

2023-03-07 Thread Ganesh Sahu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697695#comment-17697695
 ] 

Ganesh Sahu edited comment on KAFKA-14218 at 3/8/23 3:00 AM:
-

Greetings team,I am new to the community. As no specific members have been 
assigned yet and last activity was on Sep 2022, i thought i will start.I hope 
it's fine ? Do i need to raise a KIP first before making the changes.   
[~divijvaidya] [~showuon] [~mjsax] 


was (Author: JIRAUSER299047):
Greetings team, as no specific members have been assigned yet and last activity 
was on Sep 2022, i thought i will start.I hope it's fine ?

> replace temp file handler with JUnit 5 Temporary Directory Support
> --
>
> Key: KAFKA-14218
> URL: https://issues.apache.org/jira/browse/KAFKA-14218
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Luke Chen
>Assignee: Ganesh Sahu
>Priority: Major
>  Labels: Newbie, newbie
>
> We created many temp files in tests, and sometimes we forgot to delete them 
> after usage. Instead of polluting @AfterEach for each test, we should 
> consider to use JUnit 5 TempDirectory Extension.
>  
> REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431]
> 2. [https://www.baeldung.com/junit-5-temporary-directory]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support

2023-03-07 Thread Ganesh Sahu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697695#comment-17697695
 ] 

Ganesh Sahu commented on KAFKA-14218:
-

Greetings team, as no specific members have been assigned yet and last activity 
was on Sep 2022, i thought i will start.I hope it's fine ?

> replace temp file handler with JUnit 5 Temporary Directory Support
> --
>
> Key: KAFKA-14218
> URL: https://issues.apache.org/jira/browse/KAFKA-14218
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Luke Chen
>Assignee: Ganesh Sahu
>Priority: Major
>  Labels: Newbie, newbie
>
> We created many temp files in tests, and sometimes we forgot to delete them 
> after usage. Instead of polluting @AfterEach for each test, we should 
> consider to use JUnit 5 TempDirectory Extension.
>  
> REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431]
> 2. [https://www.baeldung.com/junit-5-temporary-directory]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support

2023-03-07 Thread Ganesh Sahu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ganesh Sahu reassigned KAFKA-14218:
---

Assignee: Ganesh Sahu

> replace temp file handler with JUnit 5 Temporary Directory Support
> --
>
> Key: KAFKA-14218
> URL: https://issues.apache.org/jira/browse/KAFKA-14218
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Luke Chen
>Assignee: Ganesh Sahu
>Priority: Major
>  Labels: Newbie, newbie
>
> We created many temp files in tests, and sometimes we forgot to delete them 
> after usage. Instead of polluting @AfterEach for each test, we should 
> consider to use JUnit 5 TempDirectory Extension.
>  
> REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431]
> 2. [https://www.baeldung.com/junit-5-temporary-directory]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-03-07 Thread via GitHub


kirktrue commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1128855661


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1279,9 +1283,10 @@ public ClientResponse completed(AbstractResponse 
response, long timeMs) {
 false, null, null, response);
 }
 
-public ClientResponse disconnected(long timeMs, 
AuthenticationException authenticationException) {
+public ClientResponse disconnected(long timeMs, 
AuthenticationException authenticationException, boolean timedOut) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-14752) improve kafka examples under examples package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reopened KAFKA-14752:


> improve kafka examples under examples package
> -
>
> Key: KAFKA-14752
> URL: https://issues.apache.org/jira/browse/KAFKA-14752
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: newbie, newbie++
>
> Kafka provided some examples under "examples" package. Currently we provided
>  * java-producer-consumer-demo, which is to produce 1 records and then 
> consume all of them
>  * exactly-once-demo, which is to produce records -> consume -> process  -> 
> consume.
> Among them, the base component is producer and consumer. However, I found the 
> producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  
> Furthermore, while running both the examples, I saw flood of logs output 
> because we print one line for message sent, and one line for message 
> received. In java-producer-consumer-demo, there will be 1 records 
> sent/received, so > 2 lines of logs output. Same for exactly-once-demo. 
> Maybe we should consider to reduce the record number.
>  
> One more thing, in exactly-once-demo.java, there are clear class java doc in 
> the demo file, but there's nothing in java-producer-consumer-demo.java. We 
> should also improve that.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14753) improve producer under example package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14753.

Resolution: Fixed

> improve producer under example package
> --
>
> Key: KAFKA-14753
> URL: https://issues.apache.org/jira/browse/KAFKA-14753
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>
> I found the producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14754) improve consumer under example package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14754.

Resolution: Fixed

> improve consumer under example package
> --
>
> Key: KAFKA-14754
> URL: https://issues.apache.org/jira/browse/KAFKA-14754
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Philip Nee
>Priority: Major
>
> I found the producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14752) improve kafka examples under examples package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14752.

Resolution: Fixed

> improve kafka examples under examples package
> -
>
> Key: KAFKA-14752
> URL: https://issues.apache.org/jira/browse/KAFKA-14752
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: newbie, newbie++
>
> Kafka provided some examples under "examples" package. Currently we provided
>  * java-producer-consumer-demo, which is to produce 1 records and then 
> consume all of them
>  * exactly-once-demo, which is to produce records -> consume -> process  -> 
> consume.
> Among them, the base component is producer and consumer. However, I found the 
> producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  
> Furthermore, while running both the examples, I saw flood of logs output 
> because we print one line for message sent, and one line for message 
> received. In java-producer-consumer-demo, there will be 1 records 
> sent/received, so > 2 lines of logs output. Same for exactly-once-demo. 
> Maybe we should consider to reduce the record number.
>  
> One more thing, in exactly-once-demo.java, there are clear class java doc in 
> the demo file, but there's nothing in java-producer-consumer-demo.java. We 
> should also improve that.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang merged pull request #13354: KAFKA-14753: Improve kafka producer example

2023-03-07 Thread via GitHub


guozhangwang merged PR #13354:
URL: https://github.com/apache/kafka/pull/13354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-07 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1128778509


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize / 2, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+

[GitHub] [kafka] guozhangwang merged pull request #13353: KAFKA-14752: Improving the existing consumer examples

2023-03-07 Thread via GitHub


guozhangwang merged PR #13353:
URL: https://github.com/apache/kafka/pull/13353


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

2023-03-07 Thread via GitHub


guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1128785406


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -986,6 +987,11 @@ private void prepareChangelogs(final 
Set newPartitionsToResto
 } catch (final Exception e) {
 throw new StreamsException("State restore listener failed 
on batch restored", e);
 }
+
+final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
+final StreamTask task = (StreamTask) tasks.get(taskId);
+final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);

Review Comment:
   Not very compelling reasons, I just want to make sure we do not start with a 
negative number, but I cannot think of a case that it could be negative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

2023-03-07 Thread via GitHub


guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1128784726


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -457,15 +457,9 @@ public long restore(final Map tasks) {
 //   small batches; this can be optimized in the future, 
e.g. wait longer for larger batches.
 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
 try {
+final Task task = tasks.get(taskId);
 final ChangelogMetadata changelogMetadata = 
changelogs.get(partition);
-final int restored = restoreChangelog(changelogMetadata);
-if (restored > 0 || 
changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
-final Task task = tasks.get(taskId);
-if (task != null) {

Review Comment:
   I pondered on the code and I think it should not be `null` ever? Please 
correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2

2023-03-07 Thread via GitHub


guozhangwang commented on code in PR #13300:
URL: https://github.com/apache/kafka/pull/13300#discussion_r1128784269


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -986,6 +987,11 @@ private void prepareChangelogs(final 
Set newPartitionsToResto
 } catch (final Exception e) {
 throw new StreamsException("State restore listener failed 
on batch restored", e);
 }
+
+final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
+final StreamTask task = (StreamTask) tasks.get(taskId);
+final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
+task.recordRestoreRemaining(time, recordsToRestore);

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-07 Thread via GitHub


guozhangwang merged PR #13303:
URL: https://github.com/apache/kafka/pull/13303


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-07 Thread via GitHub


guozhangwang commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1128780569


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -83,12 +92,23 @@
 private final Metrics metrics;
 private final long defaultApiTimeoutMs;
 
+public PrototypeAsyncConsumer(Properties properties,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
+}
+
+public PrototypeAsyncConsumer(final Map configs,
+  final Deserializer keyDeser,
+  final Deserializer valDeser) {
+this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, 
valDeser)), keyDeser,
+valDeser);
+}
 @SuppressWarnings("unchecked")
-public PrototypeAsyncConsumer(final Time time,
-  final ConsumerConfig config,
+public PrototypeAsyncConsumer(final ConsumerConfig config,
   final Deserializer keyDeserializer,
   final Deserializer valueDeserializer) {
-this.time = time;
+this.time = Time.SYSTEM;

Review Comment:
   I think as we go along with this, we would need to add this param back to 
the constructor :) ANyways that's for future PRs then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-07 Thread via GitHub


guozhangwang commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1459067328

   > @guozhangwang - Regarding testing the commit results, can we defer this to 
the subsequent PR, i.e. verify it once committed() is implemented.
   
   SGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown

2023-03-07 Thread via GitHub


guozhangwang commented on code in PR #13318:
URL: https://github.com/apache/kafka/pull/13318#discussion_r1128773556


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java:
##
@@ -169,26 +166,6 @@ private void createMockTaskManager() {
 topologyMetadata.buildAndRewriteTopology();
 }
 
-// If you don't care about setting the end offsets for each specific topic 
partition, the helper method

Review Comment:
   This is piggy-backed as part of incorporating 2): I found that these funcs 
are duplicated across multiple test classes from 
`AssignmentTestUtils.createMockAdminClientForAssignor`, so I removed them.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java:
##
@@ -149,35 +145,12 @@ private void configurePartitionAssignorWith(final 
Map props) {
 overwriteInternalTopicManagerWithMock();
 }
 
-// Useful for tests that don't care about the task offset sums

Review Comment:
   This is found that after we refactored with mockito, the passed in task set 
is not needed any more, so I removed them to eliminate the warnings.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java:
##
@@ -149,35 +145,12 @@ private void configurePartitionAssignorWith(final 
Map props) {
 overwriteInternalTopicManagerWithMock();
 }
 
-// Useful for tests that don't care about the task offset sums
-private void createMockTaskManager(final Set activeTasks) {
-createMockTaskManager(getTaskOffsetSums(activeTasks));
-}
-
-private void createMockTaskManager(final Map taskOffsetSums) 
{
+private void createMockTaskManager() {
 when(taskManager.topologyMetadata()).thenReturn(topologyMetadata);
 when(taskManager.processId()).thenReturn(UUID_1);
 topologyMetadata.buildAndRewriteTopology();
 }
 
-// If you don't care about setting the end offsets for each specific topic 
partition, the helper method

Review Comment:
   Ditto here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14792) Race condition in LazyIndex.get()

2023-03-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-14792.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Race condition in LazyIndex.get()
> -
>
> Key: KAFKA-14792
> URL: https://issues.apache.org/jira/browse/KAFKA-14792
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 3.5.0
>
>
> `LazyIndex.get()` has a race condition that can result in a 
> ClassCastException being thrown in some cases.
> This was introduced when it was rewritten from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma merged pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()

2023-03-07 Thread via GitHub


ijuma merged PR #13359:
URL: https://github.com/apache/kafka/pull/13359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()

2023-03-07 Thread via GitHub


ijuma commented on PR #13359:
URL: https://github.com/apache/kafka/pull/13359#issuecomment-1459049434

   2 builds passed, 1 failed with 2 unrelated tests:
   
   > Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.internals.CooperativeConsumerCoordinatorTest.testOutdatedCoordinatorAssignment()
 84 ms   1
   > Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-07 Thread via GitHub


CalvinConfluent commented on PR #13323:
URL: https://github.com/apache/kafka/pull/13323#issuecomment-1459027103

   Changing the old ReplicaId field default value to -1. In this way, we can 
easily extract the replicaId from the FetchRequest without knowing the API 
version. Simple and minimal work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-07 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1128713441


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -337,8 +362,27 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 .setResponses(topicResponseList));
 }
 
+public String clusterId() {
+return data.clusterId();
+}
+
+public List topics() {
+return data.topics();
+}
+
+public int maxWaitMs() {
+return data.maxWaitMs();
+}

Review Comment:
   Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-07 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1128713021


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +315,18 @@ public String toString() {
 }
 }
 
+public static void updateFetchRequestDataReplicaState(FetchRequestData 
fetchRequestData, int replicaId, long replicaEpoch, short version) {
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+fetchRequestData.setReplicaState(new ReplicaState());
+} else {
+fetchRequestData.setReplicaId(new FetchRequestData().replicaId());

Review Comment:
   Changed to -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.

2023-03-07 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1128712574


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -936,13 +937,13 @@ private CompletableFuture 
handleFetchRequest(
 RaftRequest.Inbound requestMetadata,
 long currentTimeMs
 ) {
-FetchRequestData request = (FetchRequestData) requestMetadata.data;
+FetchRequest request = new FetchRequest((FetchRequestData) 
requestMetadata.data, requestMetadata.apiVersion);

Review Comment:
   reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface

2023-03-07 Thread via GitHub


jolshan commented on code in PR #13357:
URL: https://github.com/apache/kafka/pull/13357#discussion_r1128706720


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -271,19 +271,28 @@ class BrokerMetadataPublisher(
 })
 } catch {
   case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
-s"authorizer changes in ${deltaName}", t)
+s"authorizer changes in $deltaName", t)
 }
   }
   case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
-})
+}
+  }
+
+  try {

Review Comment:
   This looks like the only not nit change in the file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface

2023-03-07 Thread via GitHub


jolshan commented on PR #13357:
URL: https://github.com/apache/kafka/pull/13357#issuecomment-1458991438

   Sorry if I missed it somewhere/forgot, but is this saying we will only allow 
the new group coordinator for kraft? Or do we have an alternative for ZK?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13360: MINOR: fix authorizer reconfiguration in KRaft mode

2023-03-07 Thread via GitHub


cmccabe opened a new pull request, #13360:
URL: https://github.com/apache/kafka/pull/13360

   Fix a bug with authorizer reconfiguration in KRaft mode. The bug happened 
because we were invoking DynamicBrokerConfig.addReconfigurables before 
initializing BrokerServer.authorizer. Add a test of reconfiguring the 
Authorizer. Also, in testReconfigureControllerClientQuotas, test both combined 
and isolated mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-07 Thread via GitHub


mumrah commented on code in PR #13337:
URL: https://github.com/apache/kafka/pull/13337#discussion_r1128359363


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -213,7 +213,7 @@ class BrokerMetadataPublisher(
   }
 
   // Apply configuration deltas.
-  dynamicConfigPublisher.publish(delta, newImage)
+  dynamicConfigPublisher.publish(delta, newImage, null)

Review Comment:
   Any way to avoid this null? 



##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -35,7 +35,13 @@ class DynamicConfigPublisher(
 ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
   logIdent = s"[${name()}] "
 
-  def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+  override def name(): String = s"DynamicConfigPublisher ${nodeType} 
id=${conf.nodeId}"
+
+  def publish(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest

Review Comment:
   If we're going to pass the `null` here, we should probably add a comment



##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -31,10 +32,16 @@ class DynamicConfigPublisher(
   faultHandler: FaultHandler,
   dynamicConfigHandlers: Map[String, ConfigHandler],
   nodeType: String
-) extends Logging {
+) extends org.apache.kafka.image.publisher.MetadataPublisher with Logging {
   logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] 
"
 
-  def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+  def name(): String = "DynamicConfigPublisher"
+
+  def publish(

Review Comment:
   Should this have the override annotation?



##
metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java:
##
@@ -40,33 +40,30 @@ public interface MetadataPublisher extends AutoCloseable {
 String name();
 
 /**
- * Publish a new cluster metadata snapshot that we loaded.
+ * Handle a change in the current controller.
  *
- * @param deltaThe delta between the previous state and the new one.
- * @param newImage The complete new state.
- * @param manifest The contents of what was published.
+ * @param newLeaderAndEpoch The new quorum leader and epoch. The new 
leader will be
+ *  OptionalInt.empty if there is currently no 
active controller.
  */
-void publishSnapshot(
-MetadataDelta delta,
-MetadataImage newImage,
-SnapshotManifest manifest
-);
+default void handleControllerChange(LeaderAndEpoch newLeaderAndEpoch) { }

Review Comment:
   nit: "publishControllerChange" to keep the naming consistent? 



##
metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.image.MetadataProvenance;
+
+
+/**
+ * Contains information about what was loaded.
+ */
+public interface LoaderManifest {

Review Comment:
   Why can't we include the manifest type in the MetadataProvenance? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-07 Thread via GitHub


philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1458974950

   @guozhangwang - Regarding testing the commit results, can we defer this to 
the subsequent PR, i.e. verify it once committed() is implemented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-07 Thread via GitHub


vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1128682440


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+private static final String STORE_NAME = "versioned-store";
+private static final long HISTORY_RETENTION = 3600_000L;
+
+private String inputStream;
+private String outputStream;
+private long baseTimestamp;
+
+private KafkaStreams kafkaStreams;
+
+private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+@BeforeClass
+public static void before() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void after() {
+CLUSTER.stop();
+}
+
+@Before
+public void beforeTest() throws InterruptedException {
+final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+inputStream = "input-stream-" + uniqueTestName;
+outputStream = "output-stream-" + uniqueTestName;
+CLUSTER.createTopic(inputStream);

[GitHub] [kafka] cmccabe closed pull request #13332: KAFKA-14057: Support dynamic reconfiguration in KRaft remote controllers

2023-03-07 Thread via GitHub


cmccabe closed pull request #13332: KAFKA-14057: Support dynamic 
reconfiguration in KRaft remote controllers
URL: https://github.com/apache/kafka/pull/13332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13332: KAFKA-14057: Support dynamic reconfiguration in KRaft remote controllers

2023-03-07 Thread via GitHub


cmccabe commented on PR #13332:
URL: https://github.com/apache/kafka/pull/13332#issuecomment-1458957347

   Superseded by #13116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13337: MINOR: Refactor the MetadataPublisher interface

2023-03-07 Thread via GitHub


cmccabe commented on PR #13337:
URL: https://github.com/apache/kafka/pull/13337#issuecomment-1458906783

   rebase on trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()

2023-03-07 Thread via GitHub


ijuma commented on code in PR #13359:
URL: https://github.com/apache/kafka/pull/13359#discussion_r1128578938


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java:
##
@@ -166,20 +166,25 @@ public File file() {
 
 @SuppressWarnings("unchecked")
 public T get() throws IOException {
-if (indexWrapper instanceof IndexValue)
-return ((IndexValue) indexWrapper).index;
-else if (indexWrapper instanceof IndexFile) {
+IndexWrapper wrapper = indexWrapper;
+if (wrapper instanceof IndexValue)
+return ((IndexValue) wrapper).index;

Review Comment:
   This change is not strictly necessary with the way the code behaves today 
(we never change from `IndexValue` to something else), but it's cleaner not to 
access the field again given that we are doing it without a lock here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()

2023-03-07 Thread via GitHub


ijuma commented on code in PR #13359:
URL: https://github.com/apache/kafka/pull/13359#discussion_r1128576277


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java:
##
@@ -166,20 +166,25 @@ public File file() {
 
 @SuppressWarnings("unchecked")
 public T get() throws IOException {
-if (indexWrapper instanceof IndexValue)
-return ((IndexValue) indexWrapper).index;
-else if (indexWrapper instanceof IndexFile) {
+IndexWrapper wrapper = indexWrapper;
+if (wrapper instanceof IndexValue)
+return ((IndexValue) wrapper).index;
+else {
 lock.lock();
 try {
-IndexFile indexFile = (IndexFile) indexWrapper;
-IndexValue indexValue = new 
IndexValue<>(loadIndex(indexFile.file));
-indexWrapper = indexValue;
-return indexValue.index;
+if (indexWrapper instanceof IndexValue)
+return ((IndexValue) indexWrapper).index;

Review Comment:
   The bug is that we did not check inside the lock if `indexWrapper` was of 
type `IndexValue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #13359: KAFKA-14792: Race condition in LazyIndex.get()

2023-03-07 Thread via GitHub


ijuma opened a new pull request, #13359:
URL: https://github.com/apache/kafka/pull/13359

   `LazyIndex.get()` has a race condition that can result in a 
ClassCastException being thrown in some cases.
   
   This was introduced when it was rewritten from Scala to Java.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14792) Race condition in LazyIndex.get()

2023-03-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14792:

Description: 
`LazyIndex.get()` has a race condition that can result in a ClassCastException 
being thrown in some cases.

This was introduced when it was rewritten from Scala to Java.

  was:
`LazyIndex.get()` has a race condition that can result in a ClassCastException 
being thrown at times.

This was introduced when it was rewritten from Scala to Java.


> Race condition in LazyIndex.get()
> -
>
> Key: KAFKA-14792
> URL: https://issues.apache.org/jira/browse/KAFKA-14792
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
>
> `LazyIndex.get()` has a race condition that can result in a 
> ClassCastException being thrown in some cases.
> This was introduced when it was rewritten from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14792) Race condition in LazyIndex.get()

2023-03-07 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-14792:
---

 Summary: Race condition in LazyIndex.get()
 Key: KAFKA-14792
 URL: https://issues.apache.org/jira/browse/KAFKA-14792
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma
Assignee: Ismael Juma


`LazyIndex.get()` has a race condition that can result in a ClassCastException 
being thrown at times.

This was introduced when it was rewritten from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14351) Implement controller mutation quotas in KRaft

2023-03-07 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino resolved KAFKA-14351.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Implement controller mutation quotas in KRaft
> -
>
> Key: KAFKA-14351
> URL: https://issues.apache.org/jira/browse/KAFKA-14351
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Ron Dagostino
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14791) Create a builder class for PartitionRegistration

2023-03-07 Thread Andrew Grant (Jira)
Andrew Grant created KAFKA-14791:


 Summary: Create a builder class for PartitionRegistration
 Key: KAFKA-14791
 URL: https://issues.apache.org/jira/browse/KAFKA-14791
 Project: Kafka
  Issue Type: Improvement
Reporter: Andrew Grant






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-07 Thread via GitHub


vcrfxia commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1128550162


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.StateQueryRequest;
+import org.apache.kafka.streams.query.StateQueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class VersionedKeyValueStoreIntegrationTest {
+
+private static final String STORE_NAME = "versioned-store";
+private static final long HISTORY_RETENTION = 3600_000L;
+
+private String inputStream;
+private String outputStream;
+private long baseTimestamp;
+
+private KafkaStreams kafkaStreams;
+
+private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+@BeforeClass
+public static void before() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void after() {
+CLUSTER.stop();
+}
+
+@Before
+public void beforeTest() throws InterruptedException {
+final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+inputStream = "input-stream-" + uniqueTestName;
+outputStream = "output-stream-" + uniqueTestName;
+CLUSTER.createTopic(inputStream);

[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-07 Thread via GitHub


philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1458731526

   Hey @rajinisivaram - Thanks! To your comments
   `I thought we have those methods on the existing Consumer interface. Anyway, 
that doesn't impact this PR.` : The current consumer uses kafka specific 
future, but in the new re-write, we are kind of migrating to java 
CompletableFuture
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

2023-03-07 Thread via GitHub


rajinisivaram commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1128444212


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
 @Override
 public void commitAsync(Map offsets, 
OffsetCommitCallback callback) {
-final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(offsets);
-commitEvent.future().whenComplete((r, t) -> {
-callback.onComplete(offsets, new RuntimeException(t));
+CompletableFuture future = commit(offsets);
+future.whenComplete((r, t) -> {
+if (t != null) {
+callback.onComplete(offsets, new RuntimeException(t));
+} else {
+callback.onComplete(offsets, null);
+}
 });
+}
+
+private CompletableFuture commit(Map offsets) {

Review Comment:
   I thought we have those methods on the existing Consumer interface. Anyway, 
that doesn't impact this PR.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-private Map properties;
-private SubscriptionState subscriptionState;
-private MockTime time;
-private LogContext logContext;
-private Metrics metrics;
-private ClusterResourceListeners clusterResourceListeners;
-private Optional groupId;
-private String clientId;
-private EventHandler eventHandler;
+
+private Consumer consumer;
+private Map consumerProps = new HashMap<>();
+
+private final Time time = new MockTime();
 
 @BeforeEach
 public void setup() {
-this.subscriptionState = Mockito.mock(SubscriptionState.class);
-this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-this.logContext = new LogContext();
-this.time = new MockTime();
-this.metrics = new Metrics(time);
-this.groupId = Optional.empty();
-this.clientId = "client-1";
-this.clusterResourceListeners = new ClusterResourceListeners();
-this.properties = new HashMap<>();
-this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-"localhost" +
-":");
-this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-this.properties.put(CLIENT_ID_CONFIG, "test-client");
+injectConsumerConfigs();
+}
+
+@AfterEach
+public void cleanup() {
+if (consumer != null) {
+consumer.close(Duration.ZERO);
+}
 }
+
 @Test
-public void testSubscription() {
-this.subscriptionState =
-new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
-PrototypeAsyncConsumer consumer =
-setupConsumerWithDefault();
-subscriptionState.subscribe(singleton("t1"),
-new NoOpConsumerRebalanceListener());
-assertEquals(1, consumer.subscription().size());
+public void testBackgroundThreadRunning() {
+consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
 }
 
 @Test
 public void 

[GitHub] [kafka] cmccabe merged pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-07 Thread via GitHub


cmccabe merged PR #13116:
URL: https://github.com/apache/kafka/pull/13116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-07 Thread via GitHub


cmccabe commented on PR #13116:
URL: https://github.com/apache/kafka/pull/13116#issuecomment-1458687762

   Yes. It is frustrating that we have so many flaky tests. @mumrah is fixing 
the `KafkaServerKRaftRegistrationTest`. We may need to fix `SocketServerTest` 
too in the near future since it seems to flake a lot. I agree that the test 
failures are unrelated. Will commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-07 Thread via GitHub


rondagostino commented on PR #13116:
URL: https://github.com/apache/kafka/pull/13116#issuecomment-1458662433

   There were several test failures (SelectorTest, Tls12SelectorTest, 
KafkaServerKRaftRegistrationTest, ListOffsetsRequestWithRemoteStoreTest, 
SocketServerTest) but I have seen them in the recent past sporadically, and 
they all passed locally. Some builds still running/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14786) Implement connector offset write/reset internal logic

2023-03-07 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697567#comment-17697567
 ] 

Yash Mayya commented on KAFKA-14786:


Hi [~ChrisEgerton], I'm assigning this ticket to myself if that's alright with 
you. I'm planning to work on this together with 
https://issues.apache.org/jira/browse/KAFKA-14368 since they go hand in hand.

> Implement connector offset write/reset internal logic
> -
>
> Key: KAFKA-14786
> URL: https://issues.apache.org/jira/browse/KAFKA-14786
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the internal logic necessary for altering/resetting the offsets of 
> connectors, [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Endpointsbehavior].
> This should not include any changes to public interface except the 
> introduction of the new {{SourceConnector::alterOffsets}} and 
> {{SinkConnector::alterOffsets}} methods (i.e., it should not expose or test 
> any new REST endpoints).
> Ideally, we'll separate this from KAFKA-14368, KAFKA-14784, and KAFKA-14785 
> by making all changes here target the internal Connect {{Herder}} interface, 
> and have the changes for the other three rely on those new {{Herder}} methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14786) Implement connector offset write/reset internal logic

2023-03-07 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14786:
--

Assignee: Yash Mayya

> Implement connector offset write/reset internal logic
> -
>
> Key: KAFKA-14786
> URL: https://issues.apache.org/jira/browse/KAFKA-14786
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the internal logic necessary for altering/resetting the offsets of 
> connectors, [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Endpointsbehavior].
> This should not include any changes to public interface except the 
> introduction of the new {{SourceConnector::alterOffsets}} and 
> {{SinkConnector::alterOffsets}} methods (i.e., it should not expose or test 
> any new REST endpoints).
> Ideally, we'll separate this from KAFKA-14368, KAFKA-14784, and KAFKA-14785 
> by making all changes here target the internal Connect {{Herder}} interface, 
> and have the changes for the other three rely on those new {{Herder}} methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14780) Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay deterministic

2023-03-07 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697562#comment-17697562
 ] 

Kirk True commented on KAFKA-14780:
---

[~adupriez] Yes, that seems like something we should fix. I wonder if it's 
possible to change the existing constructor (or add a second constructor) that 
allows passing in a `ScheduledExecutorService`?

> Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay 
> deterministic
> 
>
> Key: KAFKA-14780
> URL: https://issues.apache.org/jira/browse/KAFKA-14780
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
>
> The test {{RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay}} 
> relies on the actual system clock which makes it frequently fail on my poor 
> intellij setup.
>  
> The {{{}RefreshingHttpsJwks{}}}`component creates and uses a scheduled 
> executor service. We could expose the scheduling mechanism to be able to mock 
> its behaviour. One way to do could be to use the {{KafkaScheduler}} which has 
> a {{MockScheduler}} implementation which relies on {{MockTime}} instead of 
> the real time clock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown

2023-03-07 Thread via GitHub


lucasbru commented on PR #13318:
URL: https://github.com/apache/kafka/pull/13318#issuecomment-1458578634

   I triggered one now: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5554/console
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown

2023-03-07 Thread via GitHub


guozhangwang commented on PR #13318:
URL: https://github.com/apache/kafka/pull/13318#issuecomment-1458570328

   @lucasbru did you happen to have triggered a system test for this branch as 
well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14640) Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-07 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-14640.

Resolution: Fixed

> Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
> --
>
> Key: KAFKA-14640
> URL: https://issues.apache.org/jira/browse/KAFKA-14640
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> As part of KIP-890 we are making some changes to this protocol.
> 1. We can send a request to verify a partition is added to a transaction
> 2. We can batch multiple transactional IDs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-07 Thread via GitHub


jolshan commented on PR #13231:
URL: https://github.com/apache/kafka/pull/13231#issuecomment-1458557507

   for follow ups: https://issues.apache.org/jira/browse/KAFKA-14790


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14790) Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests

2023-03-07 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14790:
--

 Summary: Add more AddPartitionsToTxn tests in KafkaApis and 
Authorizer tests
 Key: KAFKA-14790
 URL: https://issues.apache.org/jira/browse/KAFKA-14790
 Project: Kafka
  Issue Type: Test
Reporter: Justine Olshan
Assignee: Justine Olshan


Followup from [https://github.com/apache/kafka/pull/13231]

We should add authorizer tests for the new version.

We should add some more tests to KafkaApis to cover auth and validation 
failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan merged pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-07 Thread via GitHub


jolshan merged PR #13231:
URL: https://github.com/apache/kafka/pull/13231


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14789) Plugin ServiceLoader visibility from isolated plugins is inconsistent

2023-03-07 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14789:
---

 Summary: Plugin ServiceLoader visibility from isolated plugins is 
inconsistent
 Key: KAFKA-14789
 URL: https://issues.apache.org/jira/browse/KAFKA-14789
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


1. Since KIP-285 (defining the basic auth extension) the classloading isolation 
mechanism has used ServiceLoader to find rest extensions.
2. Since KIP-297 (adding secrets externalization via ConfigProvider) 
serviceloading was used to discover ConfigProviders.
3. It was noticed that the isolation mechanism improperly discovered service 
loaded plugins and attributed classpath plugins to all isolated plugins. This 
was fixed by KAFKA-6991, in which the ConnectRestExtension and ConfigProvider 
manifest files are hidden from the isolated plugins, in order to hide them from 
the scanning ServiceLoader calls.
4. Since KIP-458 (adding ConnectorConfigOverridePolicy) serviceloading was used 
to discover ConnectorConfigOverridePolicy instances, but these manifests were 
not hidden from plugin classloaders. ConnectorConfigOverridePolicy objects are 
currently mis-attributed, but this has had no ill-effects at this time.
5. With KIP-898, all plugins will be loaded with the ServiceLoader, and so all 
other plugins could potentially encounter this mis-attribution bug that was 
only resolved for 2 of the plugins in the past.

The current implementation relies on a string equality test to deny reading the 
serviceloader manifests that cause mis-attribution, which is a brittle 
solution. It was very easy for the person implementing KIP-458 to forget to add 
the new class to the manifest denylist, and cause a re-appearance of the 
mis-attribution bug.

The denylist approach to preventing mis-attribution also changes the visibility 
of classpath plugins from isolated classloaders. We should choose whether the 
visibility of classpath plugins is desirable, and either eliminate the denylist 
or make it more difficult to get out-of-sync with the current list of plugins.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] urbandan commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2023-03-07 Thread via GitHub


urbandan commented on PR #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-1458499533

   @C0urante Thanks for the prototype, went through the change and left a few 
comments. Overall, I think your solution would solve this issue as well. On the 
other hand, I still think that the current API (even with your solution 
included) is still ambiguous, and the mix of mutable and immutable data will 
cause some tricky bugs in dependent code bases.
   Still, your solution would be a nice improvement compared to the current 
situation, and I don't expect any API changes going through anytime soon :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-03-07 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1128110858


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -199,7 +199,8 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 .setGenerationId(generation.generationId)
 .setMemberId(generation.memberId)
 .setGroupInstanceId(groupInstanceId)
-.setTopics(new 
ArrayList<>(requestTopicDataMap.values(;
+.setTopics(new 
ArrayList<>(requestTopicDataMap.values())),
+false /* Support of topic ids will be added with 
KAFKA-14777 */);

Review Comment:
   nit: We usually don't leave such comment in our code base.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1352,8 +1370,22 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture futu
 Set unauthorizedTopics = new HashSet<>();
 
 for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+String topicName = topic.name();
+
+if (commitResponse.version() >= 9) {
+topicName = 
topicResolver.getTopicName(topic.topicId()).orElse(null);
+
+if (topicName == null) {
+// OffsetCommit responses version 9 must use topic 
IDs. The topic's ID must have been
+// known by the client which sent the 
OffsetCommitRequest but was removed from the metadata
+// before the response was received.

Review Comment:
   Is this really true? As we keep the `TopicResolver` used to construct the 
request, all topics should be there. This case could happen if the server 
returns an unexpected topic id that was not in the request and that is not in 
the `TopicResolver`. Do I get this right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1323,27 +1335,33 @@ RequestFuture sendOffsetCommitRequest(final 
Map(requestTopicDataMap.values()))
+.setTopics(new 
ArrayList<>(requestTopicDataMap.values())),
+canUseTopicIds
 );
 
 log.trace("Sending OffsetCommit request with {} to coordinator {}", 
offsets, coordinator);
 
 return client.send(coordinator, builder)
-.compose(new OffsetCommitResponseHandler(offsets, generation));
+.compose(new OffsetCommitResponseHandler(offsets, generation, 
topicResolver));
 }
 
 private class OffsetCommitResponseHandler extends 
CoordinatorResponseHandler {
 private final Map offsets;
+private final TopicResolver topicResolver;
 
-private OffsetCommitResponseHandler(Map offsets, Generation generation) {
+private OffsetCommitResponseHandler(
+Map offsets, Generation 
generation, TopicResolver topicResolver) {

Review Comment:
   nit: We usually don't break long lines like this. I personally prefer the 
following:
   
   ```
   private OffsetCommitResponseHandler(
Map offsets,
Generation generation,
TopicResolver topicResolver
   ) {
   ```
   
   You can find other ways in the code base.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1272,6 +1275,9 @@ RequestFuture sendOffsetCommitRequest(final 
Maphttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the resolution of a topic name from its ID, or its ID from its 
name, using a local
+ * bidirectional mapping. This resolver assumes there is a bijection between 
topic IDs and topic names.
+ * 
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+//@Immutable, @ThreadSafe
+public class TopicResolver {

Review Comment:
   I am not really happy with this name but I could not find a better one yet. 
My concern is that this class is really about resolving topic ids/names and not 
really topics per say. Have you considered any alternatives?



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-07 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1128156639


##
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##
@@ -231,7 +231,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic)).error),
 ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
 ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => 
resp.errorsByProducerId.get(producerId).get(tp)),
-ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(tp)),
+ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)),

Review Comment:
   I guess it uses `authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-07 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1128154589


##
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##
@@ -231,7 +231,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic)).error),
 ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
 ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => 
resp.errorsByProducerId.get(producerId).get(tp)),
-ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(tp)),
+ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)),

Review Comment:
   The new version doesn't really use authorizer, so I wasn't sure if it was 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13352: Add support of topic ids for the OffsetFetch API from version 9.

2023-03-07 Thread via GitHub


Hangleton commented on code in PR #13352:
URL: https://github.com/apache/kafka/pull/13352#discussion_r1128143723


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java:
##
@@ -253,38 +284,15 @@ public 
List groups() {
 }
 }
 
-public Map> groupIdsToPartitions() {

Review Comment:
   Moved to tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPar

2023-03-07 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697509#comment-17697509
 ] 

Sagar Rao commented on KAFKA-14520:
---

hi [~waleedfateem] , thanks for filing this. TimeoutException could be treated 
as fatal or not since it's semantics signify it extending RetriableException 
but in this case, since the call to `position` waited for 
`default.api.timeout.ms`, it can be treated as fatal. That's why in this case, 
the task is failed as it's assumed that the topic/broker related configs would 
need updating and task would need to be restarted. 

Are you able to connect to the topic via console consumer ?  

> TimeoutException Raised by KafkaConsumer Leads to: User provided listener 
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
> invocation of onPartitionsAssigned
> --
>
> Key: KAFKA-14520
> URL: https://issues.apache.org/jira/browse/KAFKA-14520
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.1
>Reporter: Waleed Fateem
>Priority: Minor
>
> I'm on the fence on whether or not this should actually be considered a bug, 
> but decided to open it as such from the perspective of a sink developer. Even 
> though there's a sign of a potential issue on the Kafka broker's side, we're 
> dependent on Kafka Connect to provide a level of robustness so we don't have 
> to manually intervene to restart the connector.
> We don't have access to the Kafka broker cluster, so we don't know what the 
> underlying issue might be that caused the following error during a rebalance:
> {code:java}
> Nov 21, 2022 @ 
> 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of 
> 6ms expired before the position for partition topic-partition-2 could be 
> determined {code}
> That leads to the following problem:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer 
> clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User 
> provided listener 
> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
> invocation of onPartitionsAssigned for partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
> [task-thread-the-sink-1] 
> {code}
> The KafkaConsumer's position() method invoked in the WorkerSinkTask's 
> HandleRebalance 
> [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697]
>  causing that TimeoutException is:
> {code:java}
> private class HandleRebalance implements ConsumerRebalanceListener {
> @Override
> public void onPartitionsAssigned(Collection 
> partitions){
> log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
> partitions);
> for (TopicPartition tp : partitions) {long pos = 
> consumer.position(tp);lastCommittedOffsets.put(tp, new 
> OffsetAndMetadata(pos));currentOffsets.put(tp, new 
> OffsetAndMetadata(pos));log.debug("{} Assigned topic 
> partition {} with offset {}", WorkerSinkTask.this, tp, pos);
> }{code}
> Which is then considered an unrecoverable error 
> [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]:
> {code:java}
> Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR 
> WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except 
> ion. Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code}
> Do we expect that TimeoutException to cause the task to be killed, or should 
> have this been handled ideally somehow in the WorkerSinkTask's 
> HandleRebalance code?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14368) Implement connector offset write REST API

2023-03-07 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697508#comment-17697508
 ] 

Yash Mayya commented on KAFKA-14368:


Hi [~ChrisEgerton], thanks for updating the ticket. I'm still interesting in 
working on this one!

> Implement connector offset write REST API
> -
>
> Key: KAFKA-14368
> URL: https://issues.apache.org/jira/browse/KAFKA-14368
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the {{PATCH /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-07 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1128091331


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2030,13 +2032,87 @@ class KafkaApisTest {
   val response = capturedResponse.getValue
 
   if (version < 2) {
-assertEquals(Collections.singletonMap(topicPartition, 
Errors.INVALID_PRODUCER_EPOCH), response.errors())
+assertEquals(Collections.singletonMap(topicPartition, 
Errors.INVALID_PRODUCER_EPOCH), 
response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
   } else {
-assertEquals(Collections.singletonMap(topicPartition, 
Errors.PRODUCER_FENCED), response.errors())
+assertEquals(Collections.singletonMap(topicPartition, 
Errors.PRODUCER_FENCED), 
response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID))
   }
 }
   }
 
+  @Test
+  def testBatchedAddPartitionsToTxnRequest(): Unit = {

Review Comment:
   As a follow-up: It seems that the test coverage is pretty low for this API 
here. It would be great if we could extend it. e.g. authorization failures, 
validation failures, etc.



##
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##
@@ -231,7 +231,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic)).error),
 ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
 ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => 
resp.errorsByProducerId.get(producerId).get(tp)),
-ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(tp)),
+ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)),

Review Comment:
   As a follow-up: We should cover the new version here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-07 Thread via GitHub


rondagostino commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1128069250


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -479,6 +489,42 @@ class DynamicBrokerConfigTest {
 assertEquals("User:admin", authorizer.superUsers)
   }
 
+  @Test
+  def testCombinedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createCombinedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))

Review Comment:
   I fixed it.



##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -479,6 +489,42 @@ class DynamicBrokerConfigTest {
 assertEquals("User:admin", authorizer.superUsers)
   }
 
+  @Test
+  def testCombinedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createCombinedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))
+props.put("super.users", "User:admin")
+controllerServer.config.dynamicConfig.updateBrokerConfig(0, props)
+assertEquals("User:admin", authorizer.superUsers)
+  }
+
+  @Test
+  def testIsolatedControllerAuthorizerConfig(): Unit = {
+val props = TestUtils.createIsolatedControllerConfig(0, port = 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+oldConfig.dynamicConfig.initialize(None)
+
+val controllerServer: ControllerServer = 
mock(classOf[kafka.server.ControllerServer])
+
+val authorizer = new TestAuthorizer
+when(controllerServer.config).thenReturn(oldConfig)
+when(controllerServer.authorizer).thenReturn(Some(authorizer))
+// We are only testing authorizer reconfiguration, ignore any exceptions 
due to incomplete mock
+assertThrows(classOf[Throwable], () => 
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer))

Review Comment:
   Same here -- I fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2023-03-07 Thread Andy Coates (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697387#comment-17697387
 ] 

Andy Coates edited comment on KAFKA-6035 at 3/7/23 3:07 PM:


I've had a couple of instances now where I've had to suffer these "dual 
changelog topics". A few of these times the topic in question was a busy topic 
and having two copies was expensive in terms of cluster load / storage.

Consider a KS based microservice architecture, where each service defines sets 
of input and output topics, using sensible naming conventions where the name of 
the output topic should be any one of the following:
 # static, i.e. not dependent on something that can be changed in config, i.e. 
application.id
 # data-centric, i.e. based on the data set it contains, not the service that 
happens to be generating it
 # hierarchical, i.e. the topic prefix should conform to some org-wide data 
model
 # etc

Any of the above mean a change-log topic name of 
"--changelog" is going to be problematic.

Either avoiding the internal change-log (as covered by this issue), or allowing 
full control of the internal topics name (as covered by 
https://issues.apache.org/jira/browse/KAFKA-5386), would work as a solution.


was (Author: bigandy):
I've had a couple of instances now where I've had to suffer these "dual 
changelog topics". A few of these times the topic in question was a busy topic 
and having two copies was expensive in terms of cluster load / storage.

Consider a KS based microservice architecture, where each service defines sets 
of static input and output topics, using sensible naming conventions where the 
name of the output topic should be any one of the following:
 # static, i.e. not dependent on something that can be changed in config, i.e. 
application.id
 # data-centric, i.e. based on the data set it contains, not the service that 
happens to be generating it
 # hierarchical, i.e. the topic prefix should conform to some org-wide data 
model
 # etc

Any of the above mean a change-log topic name of 
"--changelog" is going to be problematic.

Either avoiding the internal change-log (as covered by this issue), or allowing 
full control of the internal topics name (as covered by 
https://issues.apache.org/jira/browse/KAFKA-5386), would work as a solution.

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-03-07 Thread via GitHub


rondagostino commented on PR #13116:
URL: https://github.com/apache/kafka/pull/13116#issuecomment-1458306158

   There were 6 test failures across 3 separate builds.  All tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #13358: MINOR: Fix anchor link in Connect docs

2023-03-07 Thread via GitHub


C0urante opened a new pull request, #13358:
URL: https://github.com/apache/kafka/pull/13358

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14788) Add TopicsToDeleteCount and ReplicasToDeleteCount to QuorumController

2023-03-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-14788:
--

 Summary: Add TopicsToDeleteCount and ReplicasToDeleteCount to 
QuorumController
 Key: KAFKA-14788
 URL: https://issues.apache.org/jira/browse/KAFKA-14788
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


TopicsToDeleteCount and ReplicasToDeleteCount are useful to trace the data 
removing when using prometheus. As a consequence, we should bring them back 
from zk quorum to Kraft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-03-07 Thread via GitHub


tinaselenge commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1458147611

   @jsancio thank you for catching this and reverting the change!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed

2023-03-07 Thread via GitHub


hudeqi commented on PR #13348:
URL: https://github.com/apache/kafka/pull/13348#issuecomment-1458087082

   Can someone take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2023-03-07 Thread Andy Coates (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697387#comment-17697387
 ] 

Andy Coates commented on KAFKA-6035:


I've had a couple of instances now where I've had to suffer these "dual 
changelog topics". A few of these times the topic in question was a busy topic 
and having two copies was expensive in terms of cluster load / storage.

Consider a KS based microservice architecture, where each service defines sets 
of static input and output topics, using sensible naming conventions where the 
name of the output topic should be any one of the following:
 # static, i.e. not dependent on something that can be changed in config, i.e. 
application.id
 # data-centric, i.e. based on the data set it contains, not the service that 
happens to be generating it
 # hierarchical, i.e. the topic prefix should conform to some org-wide data 
model
 # etc

Any of the above mean a change-log topic name of 
"--changelog" is going to be problematic.

Either avoiding the internal change-log (as covered by this issue), or allowing 
full control of the internal topics name (as covered by 
https://issues.apache.org/jira/browse/KAFKA-5386), would work as a solution.

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #10244: KAFKA-12399: Deprecate Log4J Appender

2023-03-07 Thread via GitHub


mimaison commented on code in PR #10244:
URL: https://github.com/apache/kafka/pull/10244#discussion_r1127686145


##
bin/kafka-run-class.sh:
##
@@ -222,6 +234,7 @@ fi
 
 # Log4j settings
 if [ -z "$KAFKA_LOG4J_OPTS" ]; then
+  echo "DEPRECATED: using log4j 1.x configuration. To use log4j 2.x 
configuration, run with: 'export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configurationFile=file:$base_dir/config/tools-log4j2.properties\"'"

Review Comment:
   This will trigger if you run `kafka-run-class.sh kafka.Kafka`. As we expect 
most users to use `kafka-server-start.sh`, it's probably not a big deal.



##
bin/kafka-run-class.sh:
##
@@ -63,7 +63,10 @@ shopt -s nullglob
 if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
   for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
   do
-CLASSPATH="$CLASSPATH:$dir/*"
+for file in "$dir"/*;

Review Comment:
   Why do we need these changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13223: MINOR: fix some typo in SharedServer.scala/KafkaRaftServer.scala

2023-03-07 Thread via GitHub


jlprat commented on code in PR #13223:
URL: https://github.com/apache/kafka/pull/13223#discussion_r1127673459


##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -89,7 +89,7 @@ class KafkaRaftServer(
 Some(new ControllerServer(
   sharedServer,
   KafkaRaftServer.configSchema,
-  bootstrapMetadata,
+  bootstrapMetadata

Review Comment:
   Same as above.



##
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##
@@ -73,7 +73,7 @@ class KafkaRaftServer(
 metrics,
 threadNamePrefix,
 controllerQuorumVotersFuture,
-new StandardFaultHandlerFactory(),
+new StandardFaultHandlerFactory()

Review Comment:
   This is not necessarily a typo, but rather the "trailing comma" feature, 
hence I wouldn't add this in this PR.



##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -102,7 +102,7 @@ class SharedServer(
   @volatile var brokerMetrics: BrokerServerMetrics = _
   @volatile var controllerMetrics: QuorumControllerMetrics = _
   @volatile var loader: MetadataLoader = _
-  val snapshotsDiabledReason = new AtomicReference[String](null)
+  val snapshotsDisabledReason = new AtomicReference[String](null)

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13351: KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is configured on source broker

2023-03-07 Thread via GitHub


mimaison commented on code in PR #13351:
URL: https://github.com/apache/kafka/pull/13351#discussion_r1127673083


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -149,6 +158,51 @@ public void testAclTransformation() {
 assertEquals(processedDenyAllAclBinding.entry().permissionType(), 
AclPermissionType.DENY, "should not change DENY");
 }
 
+@Test
+public void testNoBrokerAclAuthorizer() throws Exception {
+Admin sourceAdmin = mock(Admin.class);
+Admin targetAdmin = mock(Admin.class);
+MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+ExecutionException describeAclsFailure = new ExecutionException(
+"Failed to describe ACLs",
+new SecurityDisabledException("No ACL authorizer configured on 
this broker")
+);
+@SuppressWarnings("unchecked")
+KafkaFuture> describeAclsFuture = 
mock(KafkaFuture.class);
+when(describeAclsFuture.get()).thenThrow(describeAclsFailure);
+DescribeAclsResult describeAclsResult = mock(DescribeAclsResult.class);
+when(describeAclsResult.values()).thenReturn(describeAclsFuture);
+when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult);
+
+try (LogCaptureAppender connectorLogs = 
LogCaptureAppender.createAndRegister(MirrorSourceConnector.class);) {

Review Comment:
   Nit: unnecessary semicolon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14770) Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match

2023-03-07 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14770.

  Reviewer: Manikumar
Resolution: Fixed

> Allow dynamic keystore update for brokers if string representation of DN 
> matches even if canonical DNs don't match
> --
>
> Key: KAFKA-14770
> URL: https://issues.apache.org/jira/browse/KAFKA-14770
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.5.0
>
>
> To avoid mistakes during dynamic broker config updates that could potentially 
> affect clients, we restrict changes that can be performed dynamically without 
> broker restart. For broker keystore updates, we require the DN to be the same 
> for the old and new certificates since this could potentially contain host 
> names used for host name verification by clients. DNs are compared using 
> standard Java implementation of X500Principal.equals() which compares 
> canonical names. If tags of fields change from one with a printable string 
> representation and one without or vice-versa, canonical name check fails even 
> if the actual name is the same since canonical representation converts to hex 
> for some tags only. We can relax the verification to allow dynamic updates in 
> this case by enabling dynamic update if either the canonical name or the 
> RFC2253 string representation of the DN matches.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rajinisivaram merged pull request #13346: KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match

2023-03-07 Thread via GitHub


rajinisivaram merged PR #13346:
URL: https://github.com/apache/kafka/pull/13346


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #13346: KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match

2023-03-07 Thread via GitHub


rajinisivaram commented on PR #13346:
URL: https://github.com/apache/kafka/pull/13346#issuecomment-1457851382

   @omkreddy @kpatelatwork Thanks for the reviews, test failures not related, 
merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13352: Add support of topic ids for the OffsetFetch API from version 9.

2023-03-07 Thread via GitHub


Hangleton commented on PR #13352:
URL: https://github.com/apache/kafka/pull/13352#issuecomment-1457837585

   Hi, Justine (@jolshan), thanks for your help. There is another PR with the 
same purpose as this one for the `OffsetCommit` API: 
[PR-13240](https://github.com/apache/kafka/pull/13240). Please feel free to 
review. Happy to discuss further. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface

2023-03-07 Thread via GitHub


dajac opened a new pull request, #13357:
URL: https://github.com/apache/kafka/pull/13357

   The new group coordinator needs to access cluster metadata (e.g. topics, 
partitions, etc.) and it needs a mechanism to be notified when the metadata 
changes (e.g. to trigger a rebalance). In KRaft clusters, the easiest is to 
subscribe to metadata changes via the MetadataPublisher.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-07 Thread via GitHub


dajac merged PR #13329:
URL: https://github.com/apache/kafka/pull/13329


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org