[GitHub] [kafka] dajac merged pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface
dajac merged PR #13357: URL: https://github.com/apache/kafka/pull/13357 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface
dajac commented on PR #13357: URL: https://github.com/apache/kafka/pull/13357#issuecomment-1459679650 > Sorry if I missed it somewhere/forgot, but is this saying we will only allow the new group coordinator for kraft? Or do we have an alternative for ZK? That's right. At the moment, we focus on kraft support. Given our timeline for the implementation, it seems that implementing the new protocol in ZK is not worth it because we will remove it shortly after. We can still do it if we need to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1129066723 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1352,8 +1370,22 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu Set unauthorizedTopics = new HashSet<>(); for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +String topicName = topic.name(); + +if (commitResponse.version() >= 9) { Review Comment: @Hangleton I had a deeper look into this and it seems that we could get the version with `this.response.requestHeader().apiVersion()`. Could you check if this would work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohits64 opened a new pull request, #13361: KAFKA-14401: Resume WorkThread if Connector/Tasks reading offsets get stuck when underneath WorkThread dies
rohits64 opened a new pull request, #13361: URL: https://github.com/apache/kafka/pull/13361 Here WorkThread dies if any unexpected exception is encountered. Resumed the WorkThread by creating another thread and offset reading continues. Added test for this, although the test can be included with other test too if separate test feels like overkill. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support
[ https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697723#comment-17697723 ] Ganesh Sahu commented on KAFKA-14218: - Thanks [~showuon] for the prompt reply > replace temp file handler with JUnit 5 Temporary Directory Support > -- > > Key: KAFKA-14218 > URL: https://issues.apache.org/jira/browse/KAFKA-14218 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Luke Chen >Assignee: Ganesh Sahu >Priority: Major > Labels: Newbie, newbie > > We created many temp files in tests, and sometimes we forgot to delete them > after usage. Instead of polluting @AfterEach for each test, we should > consider to use JUnit 5 TempDirectory Extension. > > REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431] > 2. [https://www.baeldung.com/junit-5-temporary-directory] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-4969: -- Assignee: Bill Bejeck > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.6.0 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > - in the case of standby tasks, which tasks have progressed the most with > respect to restoration > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. > > There have been some additional discussions around task assignment on a > related PR https://github.com/apache/kafka/pull/5390 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support
[ https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697705#comment-17697705 ] Luke Chen commented on KAFKA-14218: --- No need to raise a KIP. You can start with some tests to see if it works better, and open a PR for early review. Thanks. > replace temp file handler with JUnit 5 Temporary Directory Support > -- > > Key: KAFKA-14218 > URL: https://issues.apache.org/jira/browse/KAFKA-14218 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Luke Chen >Assignee: Ganesh Sahu >Priority: Major > Labels: Newbie, newbie > > We created many temp files in tests, and sometimes we forgot to delete them > after usage. Instead of polluting @AfterEach for each test, we should > consider to use JUnit 5 TempDirectory Extension. > > REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431] > 2. [https://www.baeldung.com/junit-5-temporary-directory] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support
[ https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697695#comment-17697695 ] Ganesh Sahu edited comment on KAFKA-14218 at 3/8/23 3:00 AM: - Greetings team,I am new to the community. As no specific members have been assigned yet and last activity was on Sep 2022, i thought i will start.I hope it's fine ? Do i need to raise a KIP first before making the changes. [~divijvaidya] [~showuon] [~mjsax] was (Author: JIRAUSER299047): Greetings team, as no specific members have been assigned yet and last activity was on Sep 2022, i thought i will start.I hope it's fine ? > replace temp file handler with JUnit 5 Temporary Directory Support > -- > > Key: KAFKA-14218 > URL: https://issues.apache.org/jira/browse/KAFKA-14218 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Luke Chen >Assignee: Ganesh Sahu >Priority: Major > Labels: Newbie, newbie > > We created many temp files in tests, and sometimes we forgot to delete them > after usage. Instead of polluting @AfterEach for each test, we should > consider to use JUnit 5 TempDirectory Extension. > > REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431] > 2. [https://www.baeldung.com/junit-5-temporary-directory] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support
[ https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697695#comment-17697695 ] Ganesh Sahu commented on KAFKA-14218: - Greetings team, as no specific members have been assigned yet and last activity was on Sep 2022, i thought i will start.I hope it's fine ? > replace temp file handler with JUnit 5 Temporary Directory Support > -- > > Key: KAFKA-14218 > URL: https://issues.apache.org/jira/browse/KAFKA-14218 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Luke Chen >Assignee: Ganesh Sahu >Priority: Major > Labels: Newbie, newbie > > We created many temp files in tests, and sometimes we forgot to delete them > after usage. Instead of polluting @AfterEach for each test, we should > consider to use JUnit 5 TempDirectory Extension. > > REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431] > 2. [https://www.baeldung.com/junit-5-temporary-directory] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support
[ https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ganesh Sahu reassigned KAFKA-14218: --- Assignee: Ganesh Sahu > replace temp file handler with JUnit 5 Temporary Directory Support > -- > > Key: KAFKA-14218 > URL: https://issues.apache.org/jira/browse/KAFKA-14218 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Luke Chen >Assignee: Ganesh Sahu >Priority: Major > Labels: Newbie, newbie > > We created many temp files in tests, and sometimes we forgot to delete them > after usage. Instead of polluting @AfterEach for each test, we should > consider to use JUnit 5 TempDirectory Extension. > > REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431] > 2. [https://www.baeldung.com/junit-5-temporary-directory] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1128855661 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -1279,9 +1283,10 @@ public ClientResponse completed(AbstractResponse response, long timeMs) { false, null, null, response); } -public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException) { +public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException, boolean timedOut) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-14752) improve kafka examples under examples package
[ https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reopened KAFKA-14752: > improve kafka examples under examples package > - > > Key: KAFKA-14752 > URL: https://issues.apache.org/jira/browse/KAFKA-14752 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: newbie, newbie++ > > Kafka provided some examples under "examples" package. Currently we provided > * java-producer-consumer-demo, which is to produce 1 records and then > consume all of them > * exactly-once-demo, which is to produce records -> consume -> process -> > consume. > Among them, the base component is producer and consumer. However, I found the > producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > > Furthermore, while running both the examples, I saw flood of logs output > because we print one line for message sent, and one line for message > received. In java-producer-consumer-demo, there will be 1 records > sent/received, so > 2 lines of logs output. Same for exactly-once-demo. > Maybe we should consider to reduce the record number. > > One more thing, in exactly-once-demo.java, there are clear class java doc in > the demo file, but there's nothing in java-producer-consumer-demo.java. We > should also improve that. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14753) improve producer under example package
[ https://issues.apache.org/jira/browse/KAFKA-14753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14753. Resolution: Fixed > improve producer under example package > -- > > Key: KAFKA-14753 > URL: https://issues.apache.org/jira/browse/KAFKA-14753 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > > I found the producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14754) improve consumer under example package
[ https://issues.apache.org/jira/browse/KAFKA-14754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14754. Resolution: Fixed > improve consumer under example package > -- > > Key: KAFKA-14754 > URL: https://issues.apache.org/jira/browse/KAFKA-14754 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Philip Nee >Priority: Major > > I found the producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14752) improve kafka examples under examples package
[ https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14752. Resolution: Fixed > improve kafka examples under examples package > - > > Key: KAFKA-14752 > URL: https://issues.apache.org/jira/browse/KAFKA-14752 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: newbie, newbie++ > > Kafka provided some examples under "examples" package. Currently we provided > * java-producer-consumer-demo, which is to produce 1 records and then > consume all of them > * exactly-once-demo, which is to produce records -> consume -> process -> > consume. > Among them, the base component is producer and consumer. However, I found the > producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > > Furthermore, while running both the examples, I saw flood of logs output > because we print one line for message sent, and one line for message > received. In java-producer-consumer-demo, there will be 1 records > sent/received, so > 2 lines of logs output. Same for exactly-once-demo. > Maybe we should consider to reduce the record number. > > One more thing, in exactly-once-demo.java, there are clear class java doc in > the demo file, but there's nothing in java-producer-consumer-demo.java. We > should also improve that. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang merged pull request #13354: KAFKA-14753: Improve kafka producer example
guozhangwang merged PR #13354: URL: https://github.com/apache/kafka/pull/13354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
junrao commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1128778509 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); + +public static final String DIR_NAME = "remote-log-index-cache"; + +private static final String TMP_FILE_SUFFIX = ".tmp"; + +private final File cacheDir; +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); +private final Object lock = new Object(); +private final RemoteStorageManager remoteStorageManager; +private final Map entries; +private final ShutdownableThread cleanerThread; + +private volatile boolean closed = false; + +public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this(1024, remoteStorageManager, logDir); +} + +/** + * Creates RemoteIndexCache with the given configs. + * + * @param maxSize maximum number of segment index entries to be cached. + * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. + * @param logDir log directory + */ +public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this.remoteStorageManager = remoteStorageManager; +cacheDir = new File(logDir, DIR_NAME); + +entries = new LinkedHashMap(maxSize / 2, 0.75f, true) { +@Override +protected boolean removeEldestEntry(Map.Entry eldest) { +if (this.size() > maxSize) { +RemoteIndexCache.Entry entry = eldest.getValue(); +// Mark the entries for cleanup, background thread will clean them later. +try { +entry.markForCleanup(); +} catch (IOException e) { +throw new KafkaException(e); +} +expiredIndexes.add(entry); +return true; +} else { +return false; +} +} +}; + +init(); + +// Start cleaner thread that will clean the expired entries. +cleanerThread = createCleanerThread(); +cleanerThread.start(); +} + +
[GitHub] [kafka] guozhangwang merged pull request #13353: KAFKA-14752: Improving the existing consumer examples
guozhangwang merged PR #13353: URL: https://github.com/apache/kafka/pull/13353 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang commented on code in PR #13300: URL: https://github.com/apache/kafka/pull/13300#discussion_r1128785406 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -986,6 +987,11 @@ private void prepareChangelogs(final Set newPartitionsToResto } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } + +final TaskId taskId = changelogs.get(partition).stateManager.taskId(); +final StreamTask task = (StreamTask) tasks.get(taskId); +final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); Review Comment: Not very compelling reasons, I just want to make sure we do not start with a negative number, but I cannot think of a case that it could be negative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang commented on code in PR #13300: URL: https://github.com/apache/kafka/pull/13300#discussion_r1128784726 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -457,15 +457,9 @@ public long restore(final Map tasks) { // small batches; this can be optimized in the future, e.g. wait longer for larger batches. final TaskId taskId = changelogs.get(partition).stateManager.taskId(); try { +final Task task = tasks.get(taskId); final ChangelogMetadata changelogMetadata = changelogs.get(partition); -final int restored = restoreChangelog(changelogMetadata); -if (restored > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) { -final Task task = tasks.get(taskId); -if (task != null) { Review Comment: I pondered on the code and I think it should not be `null` ever? Please correct me if I'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13300: KAFKA-10199: Add task updater metrics, part 2
guozhangwang commented on code in PR #13300: URL: https://github.com/apache/kafka/pull/13300#discussion_r1128784269 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -986,6 +987,11 @@ private void prepareChangelogs(final Set newPartitionsToResto } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } + +final TaskId taskId = changelogs.get(partition).stateManager.taskId(); +final StreamTask task = (StreamTask) tasks.get(taskId); +final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); +task.recordRestoreRemaining(time, recordsToRestore); Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer
guozhangwang merged PR #13303: URL: https://github.com/apache/kafka/pull/13303 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer
guozhangwang commented on code in PR #13303: URL: https://github.com/apache/kafka/pull/13303#discussion_r1128780569 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -83,12 +92,23 @@ private final Metrics metrics; private final long defaultApiTimeoutMs; +public PrototypeAsyncConsumer(Properties properties, + Deserializer keyDeserializer, + Deserializer valueDeserializer) { +this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer); +} + +public PrototypeAsyncConsumer(final Map configs, + final Deserializer keyDeser, + final Deserializer valDeser) { +this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser, +valDeser); +} @SuppressWarnings("unchecked") -public PrototypeAsyncConsumer(final Time time, - final ConsumerConfig config, +public PrototypeAsyncConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { -this.time = time; +this.time = Time.SYSTEM; Review Comment: I think as we go along with this, we would need to add this param back to the constructor :) ANyways that's for future PRs then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer
guozhangwang commented on PR #13303: URL: https://github.com/apache/kafka/pull/13303#issuecomment-1459067328 > @guozhangwang - Regarding testing the commit results, can we defer this to the subsequent PR, i.e. verify it once committed() is implemented. SGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown
guozhangwang commented on code in PR #13318: URL: https://github.com/apache/kafka/pull/13318#discussion_r1128773556 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java: ## @@ -169,26 +166,6 @@ private void createMockTaskManager() { topologyMetadata.buildAndRewriteTopology(); } -// If you don't care about setting the end offsets for each specific topic partition, the helper method Review Comment: This is piggy-backed as part of incorporating 2): I found that these funcs are duplicated across multiple test classes from `AssignmentTestUtils.createMockAdminClientForAssignor`, so I removed them. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java: ## @@ -149,35 +145,12 @@ private void configurePartitionAssignorWith(final Map props) { overwriteInternalTopicManagerWithMock(); } -// Useful for tests that don't care about the task offset sums Review Comment: This is found that after we refactored with mockito, the passed in task set is not needed any more, so I removed them to eliminate the warnings. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java: ## @@ -149,35 +145,12 @@ private void configurePartitionAssignorWith(final Map props) { overwriteInternalTopicManagerWithMock(); } -// Useful for tests that don't care about the task offset sums -private void createMockTaskManager(final Set activeTasks) { -createMockTaskManager(getTaskOffsetSums(activeTasks)); -} - -private void createMockTaskManager(final Map taskOffsetSums) { +private void createMockTaskManager() { when(taskManager.topologyMetadata()).thenReturn(topologyMetadata); when(taskManager.processId()).thenReturn(UUID_1); topologyMetadata.buildAndRewriteTopology(); } -// If you don't care about setting the end offsets for each specific topic partition, the helper method Review Comment: Ditto here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14792) Race condition in LazyIndex.get()
[ https://issues.apache.org/jira/browse/KAFKA-14792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-14792. - Fix Version/s: 3.5.0 Resolution: Fixed > Race condition in LazyIndex.get() > - > > Key: KAFKA-14792 > URL: https://issues.apache.org/jira/browse/KAFKA-14792 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > Fix For: 3.5.0 > > > `LazyIndex.get()` has a race condition that can result in a > ClassCastException being thrown in some cases. > This was introduced when it was rewritten from Scala to Java. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma merged pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()
ijuma merged PR #13359: URL: https://github.com/apache/kafka/pull/13359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()
ijuma commented on PR #13359: URL: https://github.com/apache/kafka/pull/13359#issuecomment-1459049434 2 builds passed, 1 failed with 2 unrelated tests: > Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.internals.CooperativeConsumerCoordinatorTest.testOutdatedCoordinatorAssignment() 84 ms 1 > Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
CalvinConfluent commented on PR #13323: URL: https://github.com/apache/kafka/pull/13323#issuecomment-1459027103 Changing the old ReplicaId field default value to -1. In this way, we can easily extract the replicaId from the FetchRequest without knowing the API version. Simple and minimal work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1128713441 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -337,8 +362,27 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { .setResponses(topicResponseList)); } +public String clusterId() { +return data.clusterId(); +} + +public List topics() { +return data.topics(); +} + +public int maxWaitMs() { +return data.maxWaitMs(); +} Review Comment: Reverted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1128713021 ## clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java: ## @@ -302,6 +315,18 @@ public String toString() { } } +public static void updateFetchRequestDataReplicaState(FetchRequestData fetchRequestData, int replicaId, long replicaEpoch, short version) { +if (version < 15) { +fetchRequestData.setReplicaId(replicaId); +fetchRequestData.setReplicaState(new ReplicaState()); +} else { +fetchRequestData.setReplicaId(new FetchRequestData().replicaId()); Review Comment: Changed to -1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617 Add ReplicaState to FetchRequest.
CalvinConfluent commented on code in PR #13323: URL: https://github.com/apache/kafka/pull/13323#discussion_r1128712574 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -936,13 +937,13 @@ private CompletableFuture handleFetchRequest( RaftRequest.Inbound requestMetadata, long currentTimeMs ) { -FetchRequestData request = (FetchRequestData) requestMetadata.data; +FetchRequest request = new FetchRequest((FetchRequestData) requestMetadata.data, requestMetadata.apiVersion); Review Comment: reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface
jolshan commented on code in PR #13357: URL: https://github.com/apache/kafka/pull/13357#discussion_r1128706720 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -271,19 +271,28 @@ class BrokerMetadataPublisher( }) } catch { case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " + -s"authorizer changes in ${deltaName}", t) +s"authorizer changes in $deltaName", t) } } case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do. -}) +} + } + + try { Review Comment: This looks like the only not nit change in the file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface
jolshan commented on PR #13357: URL: https://github.com/apache/kafka/pull/13357#issuecomment-1458991438 Sorry if I missed it somewhere/forgot, but is this saying we will only allow the new group coordinator for kraft? Or do we have an alternative for ZK? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #13360: MINOR: fix authorizer reconfiguration in KRaft mode
cmccabe opened a new pull request, #13360: URL: https://github.com/apache/kafka/pull/13360 Fix a bug with authorizer reconfiguration in KRaft mode. The bug happened because we were invoking DynamicBrokerConfig.addReconfigurables before initializing BrokerServer.authorizer. Add a test of reconfiguring the Authorizer. Also, in testReconfigureControllerClientQuotas, test both combined and isolated mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13337: MINOR: Refactor the MetadataPublisher interface
mumrah commented on code in PR #13337: URL: https://github.com/apache/kafka/pull/13337#discussion_r1128359363 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -213,7 +213,7 @@ class BrokerMetadataPublisher( } // Apply configuration deltas. - dynamicConfigPublisher.publish(delta, newImage) + dynamicConfigPublisher.publish(delta, newImage, null) Review Comment: Any way to avoid this null? ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -35,7 +35,13 @@ class DynamicConfigPublisher( ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " - def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { + override def name(): String = s"DynamicConfigPublisher ${nodeType} id=${conf.nodeId}" + + def publish( +delta: MetadataDelta, +newImage: MetadataImage, +manifest: LoaderManifest Review Comment: If we're going to pass the `null` here, we should probably add a comment ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -31,10 +32,16 @@ class DynamicConfigPublisher( faultHandler: FaultHandler, dynamicConfigHandlers: Map[String, ConfigHandler], nodeType: String -) extends Logging { +) extends org.apache.kafka.image.publisher.MetadataPublisher with Logging { logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] " - def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { + def name(): String = "DynamicConfigPublisher" + + def publish( Review Comment: Should this have the override annotation? ## metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java: ## @@ -40,33 +40,30 @@ public interface MetadataPublisher extends AutoCloseable { String name(); /** - * Publish a new cluster metadata snapshot that we loaded. + * Handle a change in the current controller. * - * @param deltaThe delta between the previous state and the new one. - * @param newImage The complete new state. - * @param manifest The contents of what was published. + * @param newLeaderAndEpoch The new quorum leader and epoch. The new leader will be + * OptionalInt.empty if there is currently no active controller. */ -void publishSnapshot( -MetadataDelta delta, -MetadataImage newImage, -SnapshotManifest manifest -); +default void handleControllerChange(LeaderAndEpoch newLeaderAndEpoch) { } Review Comment: nit: "publishControllerChange" to keep the naming consistent? ## metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.image.MetadataProvenance; + + +/** + * Contains information about what was loaded. + */ +public interface LoaderManifest { Review Comment: Why can't we include the manifest type in the MetadataProvenance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer
philipnee commented on PR #13303: URL: https://github.com/apache/kafka/pull/13303#issuecomment-1458974950 @guozhangwang - Regarding testing the commit results, can we defer this to the subsequent PR, i.e. verify it once committed() is implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1128682440 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + +private static final String STORE_NAME = "versioned-store"; +private static final long HISTORY_RETENTION = 3600_000L; + +private String inputStream; +private String outputStream; +private long baseTimestamp; + +private KafkaStreams kafkaStreams; + +private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +@BeforeClass +public static void before() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void after() { +CLUSTER.stop(); +} + +@Before +public void beforeTest() throws InterruptedException { +final String uniqueTestName = safeUniqueTestName(getClass(), testName); +inputStream = "input-stream-" + uniqueTestName; +outputStream = "output-stream-" + uniqueTestName; +CLUSTER.createTopic(inputStream);
[GitHub] [kafka] cmccabe closed pull request #13332: KAFKA-14057: Support dynamic reconfiguration in KRaft remote controllers
cmccabe closed pull request #13332: KAFKA-14057: Support dynamic reconfiguration in KRaft remote controllers URL: https://github.com/apache/kafka/pull/13332 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13332: KAFKA-14057: Support dynamic reconfiguration in KRaft remote controllers
cmccabe commented on PR #13332: URL: https://github.com/apache/kafka/pull/13332#issuecomment-1458957347 Superseded by #13116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13337: MINOR: Refactor the MetadataPublisher interface
cmccabe commented on PR #13337: URL: https://github.com/apache/kafka/pull/13337#issuecomment-1458906783 rebase on trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()
ijuma commented on code in PR #13359: URL: https://github.com/apache/kafka/pull/13359#discussion_r1128578938 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java: ## @@ -166,20 +166,25 @@ public File file() { @SuppressWarnings("unchecked") public T get() throws IOException { -if (indexWrapper instanceof IndexValue) -return ((IndexValue) indexWrapper).index; -else if (indexWrapper instanceof IndexFile) { +IndexWrapper wrapper = indexWrapper; +if (wrapper instanceof IndexValue) +return ((IndexValue) wrapper).index; Review Comment: This change is not strictly necessary with the way the code behaves today (we never change from `IndexValue` to something else), but it's cleaner not to access the field again given that we are doing it without a lock here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13359: KAFKA-14792: Race condition in LazyIndex.get()
ijuma commented on code in PR #13359: URL: https://github.com/apache/kafka/pull/13359#discussion_r1128576277 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java: ## @@ -166,20 +166,25 @@ public File file() { @SuppressWarnings("unchecked") public T get() throws IOException { -if (indexWrapper instanceof IndexValue) -return ((IndexValue) indexWrapper).index; -else if (indexWrapper instanceof IndexFile) { +IndexWrapper wrapper = indexWrapper; +if (wrapper instanceof IndexValue) +return ((IndexValue) wrapper).index; +else { lock.lock(); try { -IndexFile indexFile = (IndexFile) indexWrapper; -IndexValue indexValue = new IndexValue<>(loadIndex(indexFile.file)); -indexWrapper = indexValue; -return indexValue.index; +if (indexWrapper instanceof IndexValue) +return ((IndexValue) indexWrapper).index; Review Comment: The bug is that we did not check inside the lock if `indexWrapper` was of type `IndexValue`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #13359: KAFKA-14792: Race condition in LazyIndex.get()
ijuma opened a new pull request, #13359: URL: https://github.com/apache/kafka/pull/13359 `LazyIndex.get()` has a race condition that can result in a ClassCastException being thrown in some cases. This was introduced when it was rewritten from Scala to Java. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14792) Race condition in LazyIndex.get()
[ https://issues.apache.org/jira/browse/KAFKA-14792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-14792: Description: `LazyIndex.get()` has a race condition that can result in a ClassCastException being thrown in some cases. This was introduced when it was rewritten from Scala to Java. was: `LazyIndex.get()` has a race condition that can result in a ClassCastException being thrown at times. This was introduced when it was rewritten from Scala to Java. > Race condition in LazyIndex.get() > - > > Key: KAFKA-14792 > URL: https://issues.apache.org/jira/browse/KAFKA-14792 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > > `LazyIndex.get()` has a race condition that can result in a > ClassCastException being thrown in some cases. > This was introduced when it was rewritten from Scala to Java. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14792) Race condition in LazyIndex.get()
Ismael Juma created KAFKA-14792: --- Summary: Race condition in LazyIndex.get() Key: KAFKA-14792 URL: https://issues.apache.org/jira/browse/KAFKA-14792 Project: Kafka Issue Type: Bug Reporter: Ismael Juma Assignee: Ismael Juma `LazyIndex.get()` has a race condition that can result in a ClassCastException being thrown at times. This was introduced when it was rewritten from Scala to Java. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14351) Implement controller mutation quotas in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino resolved KAFKA-14351. --- Fix Version/s: 3.5.0 Resolution: Fixed > Implement controller mutation quotas in KRaft > - > > Key: KAFKA-14351 > URL: https://issues.apache.org/jira/browse/KAFKA-14351 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Ron Dagostino >Priority: Major > Labels: kip-500 > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14791) Create a builder class for PartitionRegistration
Andrew Grant created KAFKA-14791: Summary: Create a builder class for PartitionRegistration Key: KAFKA-14791 URL: https://issues.apache.org/jira/browse/KAFKA-14791 Project: Kafka Issue Type: Improvement Reporter: Andrew Grant -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
vcrfxia commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1128550162 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.query.KeyQuery; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.StateQueryRequest; +import org.apache.kafka.streams.query.StateQueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class VersionedKeyValueStoreIntegrationTest { + +private static final String STORE_NAME = "versioned-store"; +private static final long HISTORY_RETENTION = 3600_000L; + +private String inputStream; +private String outputStream; +private long baseTimestamp; + +private KafkaStreams kafkaStreams; + +private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +@BeforeClass +public static void before() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void after() { +CLUSTER.stop(); +} + +@Before +public void beforeTest() throws InterruptedException { +final String uniqueTestName = safeUniqueTestName(getClass(), testName); +inputStream = "input-stream-" + uniqueTestName; +outputStream = "output-stream-" + uniqueTestName; +CLUSTER.createTopic(inputStream);
[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer
philipnee commented on PR #13303: URL: https://github.com/apache/kafka/pull/13303#issuecomment-1458731526 Hey @rajinisivaram - Thanks! To your comments `I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR.` : The current consumer uses kafka specific future, but in the new re-write, we are kind of migrating to java CompletableFuture -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer
rajinisivaram commented on code in PR #13303: URL: https://github.com/apache/kafka/pull/13303#discussion_r1128444212 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) { @Override public void commitAsync(Map offsets, OffsetCommitCallback callback) { -final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets); -commitEvent.future().whenComplete((r, t) -> { -callback.onComplete(offsets, new RuntimeException(t)); +CompletableFuture future = commit(offsets); +future.whenComplete((r, t) -> { +if (t != null) { +callback.onComplete(offsets, new RuntimeException(t)); +} else { +callback.onComplete(offsets, null); +} }); +} + +private CompletableFuture commit(Map offsets) { Review Comment: I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ## @@ -16,90 +16,77 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.consumer.internals.events.EventHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.internals.ClusterResourceListeners; -import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import java.time.Duration; import java.util.HashMap; import java.util.Map; -import java.util.Optional; -import static java.util.Collections.singleton; -import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class PrototypeAsyncConsumerTest { -private Map properties; -private SubscriptionState subscriptionState; -private MockTime time; -private LogContext logContext; -private Metrics metrics; -private ClusterResourceListeners clusterResourceListeners; -private Optional groupId; -private String clientId; -private EventHandler eventHandler; + +private Consumer consumer; +private Map consumerProps = new HashMap<>(); + +private final Time time = new MockTime(); @BeforeEach public void setup() { -this.subscriptionState = Mockito.mock(SubscriptionState.class); -this.eventHandler = Mockito.mock(DefaultEventHandler.class); -this.logContext = new LogContext(); -this.time = new MockTime(); -this.metrics = new Metrics(time); -this.groupId = Optional.empty(); -this.clientId = "client-1"; -this.clusterResourceListeners = new ClusterResourceListeners(); -this.properties = new HashMap<>(); -this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, -"localhost" + -":"); -this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -this.properties.put(CLIENT_ID_CONFIG, "test-client"); +injectConsumerConfigs(); +} + +@AfterEach +public void cleanup() { +if (consumer != null) { +consumer.close(Duration.ZERO); +} } + @Test -public void testSubscription() { -this.subscriptionState = -new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); -PrototypeAsyncConsumer consumer = -setupConsumerWithDefault(); -subscriptionState.subscribe(singleton("t1"), -new NoOpConsumerRebalanceListener()); -assertEquals(1, consumer.subscription().size()); +public void testBackgroundThreadRunning() { +consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); } @Test public void
[GitHub] [kafka] cmccabe merged pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
cmccabe merged PR #13116: URL: https://github.com/apache/kafka/pull/13116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
cmccabe commented on PR #13116: URL: https://github.com/apache/kafka/pull/13116#issuecomment-1458687762 Yes. It is frustrating that we have so many flaky tests. @mumrah is fixing the `KafkaServerKRaftRegistrationTest`. We may need to fix `SocketServerTest` too in the near future since it seems to flake a lot. I agree that the test failures are unrelated. Will commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
rondagostino commented on PR #13116: URL: https://github.com/apache/kafka/pull/13116#issuecomment-1458662433 There were several test failures (SelectorTest, Tls12SelectorTest, KafkaServerKRaftRegistrationTest, ListOffsetsRequestWithRemoteStoreTest, SocketServerTest) but I have seen them in the recent past sporadically, and they all passed locally. Some builds still running/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14786) Implement connector offset write/reset internal logic
[ https://issues.apache.org/jira/browse/KAFKA-14786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697567#comment-17697567 ] Yash Mayya commented on KAFKA-14786: Hi [~ChrisEgerton], I'm assigning this ticket to myself if that's alright with you. I'm planning to work on this together with https://issues.apache.org/jira/browse/KAFKA-14368 since they go hand in hand. > Implement connector offset write/reset internal logic > - > > Key: KAFKA-14786 > URL: https://issues.apache.org/jira/browse/KAFKA-14786 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > > Implement the internal logic necessary for altering/resetting the offsets of > connectors, [described in > KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Endpointsbehavior]. > This should not include any changes to public interface except the > introduction of the new {{SourceConnector::alterOffsets}} and > {{SinkConnector::alterOffsets}} methods (i.e., it should not expose or test > any new REST endpoints). > Ideally, we'll separate this from KAFKA-14368, KAFKA-14784, and KAFKA-14785 > by making all changes here target the internal Connect {{Herder}} interface, > and have the changes for the other three rely on those new {{Herder}} methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14786) Implement connector offset write/reset internal logic
[ https://issues.apache.org/jira/browse/KAFKA-14786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14786: -- Assignee: Yash Mayya > Implement connector offset write/reset internal logic > - > > Key: KAFKA-14786 > URL: https://issues.apache.org/jira/browse/KAFKA-14786 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > > Implement the internal logic necessary for altering/resetting the offsets of > connectors, [described in > KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Endpointsbehavior]. > This should not include any changes to public interface except the > introduction of the new {{SourceConnector::alterOffsets}} and > {{SinkConnector::alterOffsets}} methods (i.e., it should not expose or test > any new REST endpoints). > Ideally, we'll separate this from KAFKA-14368, KAFKA-14784, and KAFKA-14785 > by making all changes here target the internal Connect {{Herder}} interface, > and have the changes for the other three rely on those new {{Herder}} methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14780) Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay deterministic
[ https://issues.apache.org/jira/browse/KAFKA-14780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697562#comment-17697562 ] Kirk True commented on KAFKA-14780: --- [~adupriez] Yes, that seems like something we should fix. I wonder if it's possible to change the existing constructor (or add a second constructor) that allows passing in a `ScheduledExecutorService`? > Make RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay > deterministic > > > Key: KAFKA-14780 > URL: https://issues.apache.org/jira/browse/KAFKA-14780 > Project: Kafka > Issue Type: Improvement >Reporter: Alexandre Dupriez >Assignee: Alexandre Dupriez >Priority: Minor > > The test {{RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay}} > relies on the actual system clock which makes it frequently fail on my poor > intellij setup. > > The {{{}RefreshingHttpsJwks{}}}`component creates and uses a scheduled > executor service. We could expose the scheduling mechanism to be able to mock > its behaviour. One way to do could be to use the {{KafkaScheduler}} which has > a {{MockScheduler}} implementation which relies on {{MockTime}} instead of > the real time clock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown
lucasbru commented on PR #13318: URL: https://github.com/apache/kafka/pull/13318#issuecomment-1458578634 I triggered one now: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5554/console -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown
guozhangwang commented on PR #13318: URL: https://github.com/apache/kafka/pull/13318#issuecomment-1458570328 @lucasbru did you happen to have triggered a system test for this branch as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14640) Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
[ https://issues.apache.org/jira/browse/KAFKA-14640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-14640. Resolution: Fixed > Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests > -- > > Key: KAFKA-14640 > URL: https://issues.apache.org/jira/browse/KAFKA-14640 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > As part of KIP-890 we are making some changes to this protocol. > 1. We can send a request to verify a partition is added to a transaction > 2. We can batch multiple transactional IDs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on PR #13231: URL: https://github.com/apache/kafka/pull/13231#issuecomment-1458557507 for follow ups: https://issues.apache.org/jira/browse/KAFKA-14790 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14790) Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests
Justine Olshan created KAFKA-14790: -- Summary: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests Key: KAFKA-14790 URL: https://issues.apache.org/jira/browse/KAFKA-14790 Project: Kafka Issue Type: Test Reporter: Justine Olshan Assignee: Justine Olshan Followup from [https://github.com/apache/kafka/pull/13231] We should add authorizer tests for the new version. We should add some more tests to KafkaApis to cover auth and validation failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan merged pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan merged PR #13231: URL: https://github.com/apache/kafka/pull/13231 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14789) Plugin ServiceLoader visibility from isolated plugins is inconsistent
Greg Harris created KAFKA-14789: --- Summary: Plugin ServiceLoader visibility from isolated plugins is inconsistent Key: KAFKA-14789 URL: https://issues.apache.org/jira/browse/KAFKA-14789 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris 1. Since KIP-285 (defining the basic auth extension) the classloading isolation mechanism has used ServiceLoader to find rest extensions. 2. Since KIP-297 (adding secrets externalization via ConfigProvider) serviceloading was used to discover ConfigProviders. 3. It was noticed that the isolation mechanism improperly discovered service loaded plugins and attributed classpath plugins to all isolated plugins. This was fixed by KAFKA-6991, in which the ConnectRestExtension and ConfigProvider manifest files are hidden from the isolated plugins, in order to hide them from the scanning ServiceLoader calls. 4. Since KIP-458 (adding ConnectorConfigOverridePolicy) serviceloading was used to discover ConnectorConfigOverridePolicy instances, but these manifests were not hidden from plugin classloaders. ConnectorConfigOverridePolicy objects are currently mis-attributed, but this has had no ill-effects at this time. 5. With KIP-898, all plugins will be loaded with the ServiceLoader, and so all other plugins could potentially encounter this mis-attribution bug that was only resolved for 2 of the plugins in the past. The current implementation relies on a string equality test to deny reading the serviceloader manifests that cause mis-attribution, which is a brittle solution. It was very easy for the person implementing KIP-458 to forget to add the new class to the manifest denylist, and cause a re-appearance of the mis-attribution bug. The denylist approach to preventing mis-attribution also changes the visibility of classpath plugins from isolated classloaders. We should choose whether the visibility of classpath plugins is desirable, and either eliminate the denylist or make it more difficult to get out-of-sync with the current list of plugins. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] urbandan commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
urbandan commented on PR #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-1458499533 @C0urante Thanks for the prototype, went through the change and left a few comments. Overall, I think your solution would solve this issue as well. On the other hand, I still think that the current API (even with your solution included) is still ambiguous, and the mix of mutable and immutable data will cause some tricky bugs in dependent code bases. Still, your solution would be a nice improvement compared to the current situation, and I don't expect any API changes going through anytime soon :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1128110858 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -199,7 +199,8 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setGenerationId(generation.generationId) .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) -.setTopics(new ArrayList<>(requestTopicDataMap.values(; +.setTopics(new ArrayList<>(requestTopicDataMap.values())), +false /* Support of topic ids will be added with KAFKA-14777 */); Review Comment: nit: We usually don't leave such comment in our code base. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1352,8 +1370,22 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu Set unauthorizedTopics = new HashSet<>(); for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +String topicName = topic.name(); + +if (commitResponse.version() >= 9) { +topicName = topicResolver.getTopicName(topic.topicId()).orElse(null); + +if (topicName == null) { +// OffsetCommit responses version 9 must use topic IDs. The topic's ID must have been +// known by the client which sent the OffsetCommitRequest but was removed from the metadata +// before the response was received. Review Comment: Is this really true? As we keep the `TopicResolver` used to construct the request, all topics should be there. This case could happen if the server returns an unexpected topic id that was not in the request and that is not in the `TopicResolver`. Do I get this right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1323,27 +1335,33 @@ RequestFuture sendOffsetCommitRequest(final Map(requestTopicDataMap.values())) +.setTopics(new ArrayList<>(requestTopicDataMap.values())), +canUseTopicIds ); log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator); return client.send(coordinator, builder) -.compose(new OffsetCommitResponseHandler(offsets, generation)); +.compose(new OffsetCommitResponseHandler(offsets, generation, topicResolver)); } private class OffsetCommitResponseHandler extends CoordinatorResponseHandler { private final Map offsets; +private final TopicResolver topicResolver; -private OffsetCommitResponseHandler(Map offsets, Generation generation) { +private OffsetCommitResponseHandler( +Map offsets, Generation generation, TopicResolver topicResolver) { Review Comment: nit: We usually don't break long lines like this. I personally prefer the following: ``` private OffsetCommitResponseHandler( Map offsets, Generation generation, TopicResolver topicResolver ) { ``` You can find other ways in the code base. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1272,6 +1275,9 @@ RequestFuture sendOffsetCommitRequest(final Maphttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the resolution of a topic name from its ID, or its ID from its name, using a local + * bidirectional mapping. This resolver assumes there is a bijection between topic IDs and topic names. + * + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +//@Immutable, @ThreadSafe +public class TopicResolver { Review Comment: I am not really happy with this name but I could not find a better one yet. My concern is that this class is really about resolving topic ids/names and not really topics per say. Have you considered any alternatives?
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1128156639 ## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ## @@ -231,7 +231,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errorsByProducerId.get(producerId).get(tp)), -ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), +ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)), Review Comment: I guess it uses `authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1128154589 ## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ## @@ -231,7 +231,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errorsByProducerId.get(producerId).get(tp)), -ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), +ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)), Review Comment: The new version doesn't really use authorizer, so I wasn't sure if it was needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13352: Add support of topic ids for the OffsetFetch API from version 9.
Hangleton commented on code in PR #13352: URL: https://github.com/apache/kafka/pull/13352#discussion_r1128143723 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java: ## @@ -253,38 +284,15 @@ public List groups() { } } -public Map> groupIdsToPartitions() { Review Comment: Moved to tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14520) TimeoutException Raised by KafkaConsumer Leads to: User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on invocation of onPar
[ https://issues.apache.org/jira/browse/KAFKA-14520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697509#comment-17697509 ] Sagar Rao commented on KAFKA-14520: --- hi [~waleedfateem] , thanks for filing this. TimeoutException could be treated as fatal or not since it's semantics signify it extending RetriableException but in this case, since the call to `position` waited for `default.api.timeout.ms`, it can be treated as fatal. That's why in this case, the task is failed as it's assumed that the topic/broker related configs would need updating and task would need to be restarted. Are you able to connect to the topic via console consumer ? > TimeoutException Raised by KafkaConsumer Leads to: User provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned > -- > > Key: KAFKA-14520 > URL: https://issues.apache.org/jira/browse/KAFKA-14520 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.1 >Reporter: Waleed Fateem >Priority: Minor > > I'm on the fence on whether or not this should actually be considered a bug, > but decided to open it as such from the perspective of a sink developer. Even > though there's a sign of a potential issue on the Kafka broker's side, we're > dependent on Kafka Connect to provide a level of robustness so we don't have > to manually intervene to restart the connector. > We don't have access to the Kafka broker cluster, so we don't know what the > underlying issue might be that caused the following error during a rebalance: > {code:java} > Nov 21, 2022 @ > 06:09:44.234","org.apache.kafka.common.errors.TimeoutException: Timeout of > 6ms expired before the position for partition topic-partition-2 could be > determined {code} > That leads to the following problem: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR [Consumer > clientId=connector-consumer-the-sink-1, groupId=connect-the-sink] User > provided listener > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on > invocation of onPartitionsAssigned for partitions [] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [task-thread-the-sink-1] > {code} > The KafkaConsumer's position() method invoked in the WorkerSinkTask's > HandleRebalance > [code|https://github.com/apache/kafka/blob/3.2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L697] > causing that TimeoutException is: > {code:java} > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection > partitions){ > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > for (TopicPartition tp : partitions) {long pos = > consumer.position(tp);lastCommittedOffsets.put(tp, new > OffsetAndMetadata(pos));currentOffsets.put(tp, new > OffsetAndMetadata(pos));log.debug("{} Assigned topic > partition {} with offset {}", WorkerSinkTask.this, tp, pos); > }{code} > Which is then considered an unrecoverable error > [here|https://github.com/a0x8o/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L210]: > {code:java} > Nov 21, 2022 @ 06:09:44.234","2022-11-21 06:09:44,234 ERROR > WorkerSinkTask{id=the-sink-1} Task threw an uncaught and unrecoverable except > ion. Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-the-sink-1] {code} > Do we expect that TimeoutException to cause the task to be killed, or should > have this been handled ideally somehow in the WorkerSinkTask's > HandleRebalance code? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14368) Implement connector offset write REST API
[ https://issues.apache.org/jira/browse/KAFKA-14368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697508#comment-17697508 ] Yash Mayya commented on KAFKA-14368: Hi [~ChrisEgerton], thanks for updating the ticket. I'm still interesting in working on this one! > Implement connector offset write REST API > - > > Key: KAFKA-14368 > URL: https://issues.apache.org/jira/browse/KAFKA-14368 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > Implement the {{PATCH /connectors/name/offsets}} endpoint [described in > KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
dajac commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1128091331 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2030,13 +2032,87 @@ class KafkaApisTest { val response = capturedResponse.getValue if (version < 2) { -assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors()) +assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) } else { -assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors()) +assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) } } } + @Test + def testBatchedAddPartitionsToTxnRequest(): Unit = { Review Comment: As a follow-up: It seems that the test coverage is pretty low for this API here. It would be great if we could extend it. e.g. authorization failures, validation failures, etc. ## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ## @@ -231,7 +231,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errorsByProducerId.get(producerId).get(tp)), -ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), +ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)), Review Comment: As a follow-up: We should cover the new version here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
rondagostino commented on code in PR #13116: URL: https://github.com/apache/kafka/pull/13116#discussion_r1128069250 ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -479,6 +489,42 @@ class DynamicBrokerConfigTest { assertEquals("User:admin", authorizer.superUsers) } + @Test + def testCombinedControllerAuthorizerConfig(): Unit = { +val props = TestUtils.createCombinedControllerConfig(0, port = 9092) +val oldConfig = KafkaConfig.fromProps(props) +oldConfig.dynamicConfig.initialize(None) + +val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) + +val authorizer = new TestAuthorizer +when(controllerServer.config).thenReturn(oldConfig) +when(controllerServer.authorizer).thenReturn(Some(authorizer)) +// We are only testing authorizer reconfiguration, ignore any exceptions due to incomplete mock +assertThrows(classOf[Throwable], () => controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)) Review Comment: I fixed it. ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -479,6 +489,42 @@ class DynamicBrokerConfigTest { assertEquals("User:admin", authorizer.superUsers) } + @Test + def testCombinedControllerAuthorizerConfig(): Unit = { +val props = TestUtils.createCombinedControllerConfig(0, port = 9092) +val oldConfig = KafkaConfig.fromProps(props) +oldConfig.dynamicConfig.initialize(None) + +val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) + +val authorizer = new TestAuthorizer +when(controllerServer.config).thenReturn(oldConfig) +when(controllerServer.authorizer).thenReturn(Some(authorizer)) +// We are only testing authorizer reconfiguration, ignore any exceptions due to incomplete mock +assertThrows(classOf[Throwable], () => controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)) +props.put("super.users", "User:admin") +controllerServer.config.dynamicConfig.updateBrokerConfig(0, props) +assertEquals("User:admin", authorizer.superUsers) + } + + @Test + def testIsolatedControllerAuthorizerConfig(): Unit = { +val props = TestUtils.createIsolatedControllerConfig(0, port = 9092) +val oldConfig = KafkaConfig.fromProps(props) +oldConfig.dynamicConfig.initialize(None) + +val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) + +val authorizer = new TestAuthorizer +when(controllerServer.config).thenReturn(oldConfig) +when(controllerServer.authorizer).thenReturn(Some(authorizer)) +// We are only testing authorizer reconfiguration, ignore any exceptions due to incomplete mock +assertThrows(classOf[Throwable], () => controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)) Review Comment: Same here -- I fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697387#comment-17697387 ] Andy Coates edited comment on KAFKA-6035 at 3/7/23 3:07 PM: I've had a couple of instances now where I've had to suffer these "dual changelog topics". A few of these times the topic in question was a busy topic and having two copies was expensive in terms of cluster load / storage. Consider a KS based microservice architecture, where each service defines sets of input and output topics, using sensible naming conventions where the name of the output topic should be any one of the following: # static, i.e. not dependent on something that can be changed in config, i.e. application.id # data-centric, i.e. based on the data set it contains, not the service that happens to be generating it # hierarchical, i.e. the topic prefix should conform to some org-wide data model # etc Any of the above mean a change-log topic name of "--changelog" is going to be problematic. Either avoiding the internal change-log (as covered by this issue), or allowing full control of the internal topics name (as covered by https://issues.apache.org/jira/browse/KAFKA-5386), would work as a solution. was (Author: bigandy): I've had a couple of instances now where I've had to suffer these "dual changelog topics". A few of these times the topic in question was a busy topic and having two copies was expensive in terms of cluster load / storage. Consider a KS based microservice architecture, where each service defines sets of static input and output topics, using sensible naming conventions where the name of the output topic should be any one of the following: # static, i.e. not dependent on something that can be changed in config, i.e. application.id # data-centric, i.e. based on the data set it contains, not the service that happens to be generating it # hierarchical, i.e. the topic prefix should conform to some org-wide data model # etc Any of the above mean a change-log topic name of "--changelog" is going to be problematic. Either avoiding the internal change-log (as covered by this issue), or allowing full control of the internal topics name (as covered by https://issues.apache.org/jira/browse/KAFKA-5386), would work as a solution. > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Jeyhun Karimov >Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
rondagostino commented on PR #13116: URL: https://github.com/apache/kafka/pull/13116#issuecomment-1458306158 There were 6 test failures across 3 separate builds. All tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #13358: MINOR: Fix anchor link in Connect docs
C0urante opened a new pull request, #13358: URL: https://github.com/apache/kafka/pull/13358 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14788) Add TopicsToDeleteCount and ReplicasToDeleteCount to QuorumController
Chia-Ping Tsai created KAFKA-14788: -- Summary: Add TopicsToDeleteCount and ReplicasToDeleteCount to QuorumController Key: KAFKA-14788 URL: https://issues.apache.org/jira/browse/KAFKA-14788 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai TopicsToDeleteCount and ReplicasToDeleteCount are useful to trace the data removing when using prometheus. As a consequence, we should bring them back from zk quorum to Kraft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file
tinaselenge commented on PR #13102: URL: https://github.com/apache/kafka/pull/13102#issuecomment-1458147611 @jsancio thank you for catching this and reverting the change! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13348: MINOR: Fix ProducerPerformance still counting successful sending when sending failed
hudeqi commented on PR #13348: URL: https://github.com/apache/kafka/pull/13348#issuecomment-1458087082 Can someone take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697387#comment-17697387 ] Andy Coates commented on KAFKA-6035: I've had a couple of instances now where I've had to suffer these "dual changelog topics". A few of these times the topic in question was a busy topic and having two copies was expensive in terms of cluster load / storage. Consider a KS based microservice architecture, where each service defines sets of static input and output topics, using sensible naming conventions where the name of the output topic should be any one of the following: # static, i.e. not dependent on something that can be changed in config, i.e. application.id # data-centric, i.e. based on the data set it contains, not the service that happens to be generating it # hierarchical, i.e. the topic prefix should conform to some org-wide data model # etc Any of the above mean a change-log topic name of "--changelog" is going to be problematic. Either avoiding the internal change-log (as covered by this issue), or allowing full control of the internal topics name (as covered by https://issues.apache.org/jira/browse/KAFKA-5386), would work as a solution. > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Jeyhun Karimov >Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #10244: KAFKA-12399: Deprecate Log4J Appender
mimaison commented on code in PR #10244: URL: https://github.com/apache/kafka/pull/10244#discussion_r1127686145 ## bin/kafka-run-class.sh: ## @@ -222,6 +234,7 @@ fi # Log4j settings if [ -z "$KAFKA_LOG4J_OPTS" ]; then + echo "DEPRECATED: using log4j 1.x configuration. To use log4j 2.x configuration, run with: 'export KAFKA_LOG4J_OPTS=\"-Dlog4j.configurationFile=file:$base_dir/config/tools-log4j2.properties\"'" Review Comment: This will trigger if you run `kafka-run-class.sh kafka.Kafka`. As we expect most users to use `kafka-server-start.sh`, it's probably not a big deal. ## bin/kafka-run-class.sh: ## @@ -63,7 +63,10 @@ shopt -s nullglob if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*; do -CLASSPATH="$CLASSPATH:$dir/*" +for file in "$dir"/*; Review Comment: Why do we need these changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a diff in pull request #13223: MINOR: fix some typo in SharedServer.scala/KafkaRaftServer.scala
jlprat commented on code in PR #13223: URL: https://github.com/apache/kafka/pull/13223#discussion_r1127673459 ## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ## @@ -89,7 +89,7 @@ class KafkaRaftServer( Some(new ControllerServer( sharedServer, KafkaRaftServer.configSchema, - bootstrapMetadata, + bootstrapMetadata Review Comment: Same as above. ## core/src/main/scala/kafka/server/KafkaRaftServer.scala: ## @@ -73,7 +73,7 @@ class KafkaRaftServer( metrics, threadNamePrefix, controllerQuorumVotersFuture, -new StandardFaultHandlerFactory(), +new StandardFaultHandlerFactory() Review Comment: This is not necessarily a typo, but rather the "trailing comma" feature, hence I wouldn't add this in this PR. ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -102,7 +102,7 @@ class SharedServer( @volatile var brokerMetrics: BrokerServerMetrics = _ @volatile var controllerMetrics: QuorumControllerMetrics = _ @volatile var loader: MetadataLoader = _ - val snapshotsDiabledReason = new AtomicReference[String](null) + val snapshotsDisabledReason = new AtomicReference[String](null) Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13351: KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is configured on source broker
mimaison commented on code in PR #13351: URL: https://github.com/apache/kafka/pull/13351#discussion_r1127673083 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -149,6 +158,51 @@ public void testAclTransformation() { assertEquals(processedDenyAllAclBinding.entry().permissionType(), AclPermissionType.DENY, "should not change DENY"); } +@Test +public void testNoBrokerAclAuthorizer() throws Exception { +Admin sourceAdmin = mock(Admin.class); +Admin targetAdmin = mock(Admin.class); +MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin); + +ExecutionException describeAclsFailure = new ExecutionException( +"Failed to describe ACLs", +new SecurityDisabledException("No ACL authorizer configured on this broker") +); +@SuppressWarnings("unchecked") +KafkaFuture> describeAclsFuture = mock(KafkaFuture.class); +when(describeAclsFuture.get()).thenThrow(describeAclsFailure); +DescribeAclsResult describeAclsResult = mock(DescribeAclsResult.class); +when(describeAclsResult.values()).thenReturn(describeAclsFuture); +when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult); + +try (LogCaptureAppender connectorLogs = LogCaptureAppender.createAndRegister(MirrorSourceConnector.class);) { Review Comment: Nit: unnecessary semicolon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14770) Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match
[ https://issues.apache.org/jira/browse/KAFKA-14770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-14770. Reviewer: Manikumar Resolution: Fixed > Allow dynamic keystore update for brokers if string representation of DN > matches even if canonical DNs don't match > -- > > Key: KAFKA-14770 > URL: https://issues.apache.org/jira/browse/KAFKA-14770 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.5.0 > > > To avoid mistakes during dynamic broker config updates that could potentially > affect clients, we restrict changes that can be performed dynamically without > broker restart. For broker keystore updates, we require the DN to be the same > for the old and new certificates since this could potentially contain host > names used for host name verification by clients. DNs are compared using > standard Java implementation of X500Principal.equals() which compares > canonical names. If tags of fields change from one with a printable string > representation and one without or vice-versa, canonical name check fails even > if the actual name is the same since canonical representation converts to hex > for some tags only. We can relax the verification to allow dynamic updates in > this case by enabling dynamic update if either the canonical name or the > RFC2253 string representation of the DN matches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rajinisivaram merged pull request #13346: KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match
rajinisivaram merged PR #13346: URL: https://github.com/apache/kafka/pull/13346 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #13346: KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match
rajinisivaram commented on PR #13346: URL: https://github.com/apache/kafka/pull/13346#issuecomment-1457851382 @omkreddy @kpatelatwork Thanks for the reviews, test failures not related, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13352: Add support of topic ids for the OffsetFetch API from version 9.
Hangleton commented on PR #13352: URL: https://github.com/apache/kafka/pull/13352#issuecomment-1457837585 Hi, Justine (@jolshan), thanks for your help. There is another PR with the same purpose as this one for the `OffsetCommit` API: [PR-13240](https://github.com/apache/kafka/pull/13240). Please feel free to review. Happy to discuss further. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13357: KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface
dajac opened a new pull request, #13357: URL: https://github.com/apache/kafka/pull/13357 The new group coordinator needs to access cluster metadata (e.g. topics, partitions, etc.) and it needs a mechanism to be notified when the metadata changes (e.g. to trigger a rebalance). In KRaft clusters, the easiest is to subscribe to metadata changes via the MetadataPublisher. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface
dajac merged PR #13329: URL: https://github.com/apache/kafka/pull/13329 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org