[GitHub] [kafka] dajac commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-02 Thread via GitHub


dajac commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122800222


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -163,6 +163,20 @@ object Defaults {
   val GroupInitialRebalanceDelayMs = 3000
   val GroupMaxSize: Int = Int.MaxValue
 
+  /** New group coordinator configs */
+  val NewGroupCoordinatorEnable = false
+  val GroupCoordinatorNumThreads = 1
+
+  /** Consumer group configs */
+  val ConsumerGroupSessionTimeoutMs = 45000
+  val ConsumerGroupMinSessionTimeoutMs = 45000
+  val ConsumerGroupMaxSessionTimeoutMs = 6
+  val ConsumerGroupHeartbeatIntervalMs = 5000
+  val ConsumerGroupMinHeartbeatInternalMs = 5000
+  val ConsumerGroupMaxHeartbeatInternalMs = 15000
+  val ConsumerGroupMaxSize = Int.MaxValue
+  val ConsumerGroupAssignors = ""

Review Comment:
   Correct. We will add them when we have them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-02 Thread via GitHub


dajac commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122805573


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 
+  /** New group coordinator configs */
+  // All properties are kept internal until KIP-848 is released.
+  // This property meant to be here only during the development of 
KIP-848. It will
+  // be replaced by a metadata version before releasing it.
+  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc)

Review Comment:
   `defineInternal` does not have an overload without the `validator`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-02 Thread via GitHub


dajac commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122808060


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 
+  /** New group coordinator configs */
+  // All properties are kept internal until KIP-848 is released.
+  // This property meant to be here only during the development of 
KIP-848. It will
+  // be replaced by a metadata version before releasing it.
+  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc)
+  .defineInternal(GroupCoordinatorNumThreadsProp, INT, 
Defaults.GroupCoordinatorNumThreads, atLeast(1), MEDIUM, 
GroupCoordinatorNumThreadsDoc)
+
+  /** Consumer groups configs */
+  // All properties are kept internal until KIP-848 is released.
+  .defineInternal(ConsumerGroupSessionTimeoutMsProp, INT, 
Defaults.ConsumerGroupSessionTimeoutMs, atLeast(1), MEDIUM, 
ConsumerGroupSessionTimeoutMsDoc)

Review Comment:
   They will be mainly used to ensure that the dynamic group configurations are 
within the defined boundaries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-02 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122830477


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,43 +128,53 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
-public static class Builder {
-OffsetCommitResponseData data = new OffsetCommitResponseData();
-HashMap byTopicName = new 
HashMap<>();
+public short version() {
+return version;
+}
 
-private OffsetCommitResponseTopic getOrCreateTopic(
-String topicName
-) {
-OffsetCommitResponseTopic topic = byTopicName.get(topicName);
-if (topic == null) {
-topic = new OffsetCommitResponseTopic().setName(topicName);
-data.topics().add(topic);
-byTopicName.put(topicName, topic);
-}
-return topic;
+public static Builder newBuilder(TopicResolver topicResolver, short 
version) {
+if (version >= 9) {
+return new Builder<>(topicResolver, new ByTopicId(), version);
+} else {
+return new Builder<>(topicResolver, new ByTopicName(), version);
 }
+}
 
-public Builder addPartition(
-String topicName,
-int partitionIndex,
-Errors error
-) {
-final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+public static final class Builder {
+private final TopicResolver topicResolver;
+private final TopicClassifier topicClassifier;

Review Comment:
   Thinking about it, it seems unnecessary to adopt a different classification 
for v >= 9 since topic names should always be resolved when calling 
`addPartition`. Will remove all this logic and simplify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-02 Thread via GitHub


dajac commented on PR #13322:
URL: https://github.com/apache/kafka/pull/13322#issuecomment-1451584302

   @jolshan @jeffkbkim Thanks for your comments. Addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1122921475


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1328,7 +1328,7 @@ Priority priority() {
 @Override
 public void handleResponse(AbstractResponse response) {
 AddPartitionsToTxnResponse addPartitionsToTxnResponse = 
(AddPartitionsToTxnResponse) response;
-Map errors = 
addPartitionsToTxnResponse.errors();
+Map errors = 
addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);

Review Comment:
   Yeah, the check is probably not necessary. By the way, I find the idea of 
having `V3_AND_BELOW_TXN_ID` for old version a bit confusing. I was wondering 
if using `addPartitionsToTxnResponse.data().resultsByTopicV3AndBelow()` would 
be a better alternative here. We only iterate over the Map so the Map is not 
strictly required here. Have you considered something like this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1122925511


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   Yeah, I like the top level error code for that reason. However, we must be 
clear on how we want to use it. I suppose that we can only use it for cluster 
authorization failure and unexpected errors failing the entire request/response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-03-02 Thread via GitHub


mimaison merged PR #13095:
URL: https://github.com/apache/kafka/pull/13095


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1122927383


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1303,11 +1305,13 @@ public void 
testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr
 Map errors = new HashMap<>();
 errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
 errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
+AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
+AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
 client.respond(body -> {
 AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) 
body;
-assertEquals(new HashSet<>(request.partitions()), new 
HashSet<>(errors.keySet()));
+assertEquals(new 
HashSet<>(AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics())),
 new HashSet<>(errors.keySet()));

Review Comment:
   I tend to agree with this. Are those helpers only used in tests? If so, it 
may be better to put them in `AddPartitionsToTxnRequestTest`. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9234) Consider using @Nullable and @Nonnull annotations

2023-03-02 Thread Ganesh Sahu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695677#comment-17695677
 ] 

Ganesh Sahu commented on KAFKA-9234:


Is this issue still open ?? 

> Consider using @Nullable and @Nonnull annotations
> -
>
> Key: KAFKA-9234
> URL: https://issues.apache.org/jira/browse/KAFKA-9234
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, KafkaConnect, producer , 
> streams, streams-test-utils
>Reporter: Matthias J. Sax
>Assignee: Manasvi Gupta
>Priority: Minor
>  Labels: beginner, newbie
>
> Java7 was dropped some time ago, and we might want to consider usein Java8 
> `@Nullable` and `@Nonnull` annotations for all public facing APIs instead of 
> documenting it in JavaDocs only.
> This tickets should be broken down in a series of smaller PRs to keep the 
> scope of each PR contained, allowing for more effective reviews.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-9234) Consider using @Nullable and @Nonnull annotations

2023-03-02 Thread Ganesh Sahu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695677#comment-17695677
 ] 

Ganesh Sahu edited comment on KAFKA-9234 at 3/2/23 11:52 AM:
-

Is this issue still open for contribution as i see a closed PR.


was (Author: JIRAUSER299047):
Is this issue still open ?? 

> Consider using @Nullable and @Nonnull annotations
> -
>
> Key: KAFKA-9234
> URL: https://issues.apache.org/jira/browse/KAFKA-9234
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, KafkaConnect, producer , 
> streams, streams-test-utils
>Reporter: Matthias J. Sax
>Assignee: Manasvi Gupta
>Priority: Minor
>  Labels: beginner, newbie
>
> Java7 was dropped some time ago, and we might want to consider usein Java8 
> `@Nullable` and `@Nonnull` annotations for all public facing APIs instead of 
> documenting it in JavaDocs only.
> This tickets should be broken down in a series of smaller PRs to keep the 
> scope of each PR contained, allowing for more effective reviews.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-03-02 Thread via GitHub


divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1451769479

   The test failures seems unrelated (and `./gradlew unitTest` pass for me 
locally), hence, rebasing against trunk with a hope that the gradle issues go 
away.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13324: MINOR: Fix hint in selector poll

2023-03-02 Thread via GitHub


hudeqi commented on PR #13324:
URL: https://github.com/apache/kafka/pull/13324#issuecomment-145177

   Anyone take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1123026606


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);

Review Comment:
   This exception is not directly thrown to the constructor. It is inside an 
anonymous inner class of `LinkedHashMap#removeEldestEntry`. This method is not 
declared with `IOException`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1123035787


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+pr

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1123037354


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+pr

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1123045997


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+pr

[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123064502


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !aut

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

2023-03-02 Thread via GitHub


divijvaidya commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1122955794


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -606,7 +607,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
 newPort
   }
 
-  private[network] val processors = new ArrayBuffer[Processor]()
+  private[network] val processors = new CopyOnWriteArrayList[Processor]()

Review Comment:
   It would be nice if you could add a comment here on why we chose this data 
structure. Folks who look at this code in future will have a clear explanation 
of choices and tradeoffs we made for this.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -141,8 +142,8 @@ class SocketServer(val config: KafkaConfig,
   }
   newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
   newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
-val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => {
+val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => 
a.processors.asScala)

Review Comment:
   We cannot convert processors to Scala since it transforms it into a mutable 
ArrayBuffer.
   
   We probably don't need Scala transformations here. Could you please try 
something like this:
   `dataPlaneAcceptors.values.stream.flatMap(a => a.processors.stream)`
   
   (same comment for other places such as line 119)



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -115,22 +115,23 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => {
+val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => 
a.processors.asScala)
+// copy to an immutable array to avoid concurrency issue when calculating 
average
 val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
   metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-}
-if (dataPlaneProcessors.isEmpty) {
+}.toArray

Review Comment:
   This leads to object creation on every call to this metric (which is going 
to happen frequently). Do we really want this?
   
   If I understand correctly, your motivation is to guard against scenarios 
where the number of processors between the time when we calculate 
`ioWaitRatioMetricNames` and when we calculate `dataPlaneProcessors.size`. To 
mitigate it, we can store the size in an int before this and then, we don't 
have to convert `ioWaitRatioMetricNames` to an array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pierDipi opened a new pull request, #13325: KAFKA-14771: Include threads info in ConcurrentModificationException message

2023-03-02 Thread via GitHub


pierDipi opened a new pull request, #13325:
URL: https://github.com/apache/kafka/pull/13325

   In the KafkaConsumer.acquire method a ConcurrentModificationException
   exception is thrown when
   ```
   threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)
   ```
   
   however, the exception message doesn't include info on:
   - Thread.currentThread()
   - currentThread.get()
   
   Including info on the aforementioned variables in the exception message
   is useful for debugging the issue.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

2023-03-02 Thread via GitHub


chia7712 commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1123098018


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -141,8 +142,8 @@ class SocketServer(val config: KafkaConfig,
   }
   newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
   newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
-val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => {
+val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => 
a.processors.asScala)

Review Comment:
   > We cannot convert processors to Scala since it transforms it into a 
mutable ArrayBuffer.
   
   pardon me. why we can't use "mutable ArrayBuffer" here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pierDipi commented on pull request #13325: KAFKA-14771: Include threads info in ConcurrentModificationException message

2023-03-02 Thread via GitHub


pierDipi commented on PR #13325:
URL: https://github.com/apache/kafka/pull/13325#issuecomment-1451869239

   cc for review @hachikuji @showuon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1123094538


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -936,13 +937,13 @@ private CompletableFuture 
handleFetchRequest(
 RaftRequest.Inbound requestMetadata,
 long currentTimeMs
 ) {
-FetchRequestData request = (FetchRequestData) requestMetadata.data;
+FetchRequest request = new FetchRequest((FetchRequestData) 
requestMetadata.data, requestMetadata.apiVersion);

Review Comment:
   I am not sure to understand why we need this change. Could you elaborate?



##
raft/src/main/java/org/apache/kafka/raft/RaftRequest.java:
##
@@ -45,11 +45,14 @@ public long createdTimeMs() {
 return createdTimeMs;
 }
 
+
 public static class Inbound extends RaftRequest {
 public final CompletableFuture completion = new 
CompletableFuture<>();
+public final short apiVersion;
 
-public Inbound(int correlationId, ApiMessage data, long createdTimeMs) 
{
+public Inbound(int correlationId, ApiMessage data, long createdTimeMs, 
short apiVertion) {

Review Comment:
   Same question here.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -48,6 +48,7 @@
 import org.apache.kafka.common.requests.DescribeQuorumResponse;
 import org.apache.kafka.common.requests.EndQuorumEpochRequest;
 import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchRequest;

Review Comment:
   Not related to this line. It would be better to always construct the 
FetchRequest with the new schema 
[here](https://github.com/apache/kafka/blob/a304d91d49b2ca56b51d3a9d6589ef69a6065bbb/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1791)
 and to downgrade in the builder later on.



##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +315,18 @@ public String toString() {
 }
 }
 
+public static void updateFetchRequestDataReplicaState(FetchRequestData 
fetchRequestData, int replicaId, long replicaEpoch, short version) {
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+fetchRequestData.setReplicaState(new ReplicaState());
+} else {
+fetchRequestData.setReplicaId(new FetchRequestData().replicaId());

Review Comment:
   nit: Allocating `FetchRequestData` to get `-1` is a bit wasteful here. Could 
we just use `-1`?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java:
##
@@ -25,6 +25,7 @@
 public class FetchParams {
 public final short requestVersion;
 public final int replicaId;
+public final long brokerEpoch;

Review Comment:
   nit: replicaEpoch?



##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -50,14 +50,22 @@
   // Version 13 replaces topic names with topic IDs (KIP-516). May return 
UNKNOWN_TOPIC_ID error code.
   //
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException(KIP-405)
-  "validVersions": "0-14",
+  //
+  // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId (KIP-903)
+  "validVersions": "0-15",
   "flexibleVersions": "12+",
   "fields": [
 { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
   "taggedVersions": "12+", "tag": 0, "ignorable": true,
   "about": "The clusterId if known. This is used to validate metadata 
fetches prior to broker registration." },
-{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "entityType": 
"brokerId",
   "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
+{ "name": "ReplicaState", "type": "ReplicaState", "taggedVersions":"15+", 
"tag": 1, "fields": [
+  { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
"-1", "entityType": "brokerId",
+"about": "The replica ID of the follower, of -1 if this request is 
from a consumer." },

Review Comment:
   nit: `of` -> `or`?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java:
##
@@ -42,6 +44,7 @@ public FetchParams(short requestVersion,
 Objects.requireNonNull(clientMetadata);
 this.requestVersion = requestVersion;
 this.replicaId = replicaId;
+this.brokerEpoch = brokerEpoch;

Review Comment:
   We need to update `hashCode`, `equals` and `toString` as well.



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -110,11 +110,13 @@ object PartitionTest {
 replicaId: Int,
 maxWaitMs: Long = 0L,
 minBytes: Int = 1,
-maxBytes: Int = Int.MaxVa

[GitHub] [kafka] chia7712 commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

2023-03-02 Thread via GitHub


chia7712 commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1123116645


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -115,22 +115,23 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => {
+val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => 
a.processors.asScala)
+// copy to an immutable array to avoid concurrency issue when calculating 
average
 val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
   metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-}
-if (dataPlaneProcessors.isEmpty) {
+}.toArray

Review Comment:
   > To mitigate it, we can store the size in an int before this and then, we 
don't have to convert ioWaitRatioMetricNames to an array.
   
   the `ioWaitRatioMetricNames` might reflect modifications of `processors`, so 
the actual size of processors may get changed when we calculate the average. 
For example, we cache the size = 5 but the `processors` could be increased to 6 
when we summarize all `processors`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-03-02 Thread via GitHub


dajac commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1123117911


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
 // thus it is not safe to reassign the sequence.
 failBatch(batch, response, batch.attempts() < this.retries);
 }
-if (error.exception() instanceof InvalidMetadataException) {
+if (error.exception() instanceof InvalidMetadataException || 
error.exception() instanceof TimeoutException) {

Review Comment:
   @kirktrue Do we need that new test if we remove the change in the Sender? We 
could perhaps remove it and keep the PR focused on the initial change. This 
would allow us to merge that first part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

2023-03-02 Thread via GitHub


divijvaidya commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1123127295


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -115,22 +115,15 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
-val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
-val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-  metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-}
-if (dataPlaneProcessors.isEmpty) {
-  1.0
-} else {
-  ioWaitRatioMetricNames.map { metricName =>
-Option(metrics.metric(metricName)).fold(0.0)(m => 
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-  }.sum / dataPlaneProcessors.size
-}
-  })
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() =>
+dataPlaneAcceptors.values.stream().flatMap(a => a.processors.stream())

Review Comment:
   nit
   
   `()` can be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-02 Thread Pierangelo Di Pilato (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierangelo Di Pilato reassigned KAFKA-14771:


Assignee: Pierangelo Di Pilato

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Assignee: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-02 Thread Pierangelo Di Pilato (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierangelo Di Pilato updated KAFKA-14771:
-
Fix Version/s: 3.3

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Assignee: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
> Fix For: 3.3
>
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-02 Thread Pierangelo Di Pilato (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierangelo Di Pilato updated KAFKA-14771:
-
Fix Version/s: (was: 3.3)

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Assignee: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.

2023-03-02 Thread via GitHub


C0urante merged PR #13287:
URL: https://github.com/apache/kafka/pull/13287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14774) the removed listeners should not be reconfigurable

2023-03-02 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-14774:
--

 Summary: the removed listeners should not be reconfigurable
 Key: KAFKA-14774
 URL: https://issues.apache.org/jira/browse/KAFKA-14774
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Users can alter broker configuration to remove specify listeners. However, the 
removed listeners are NOT removed from `reconfigurables` list. It can result in 
the idle processors if users increases the network threads subsequently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13702) Connect RestClient overrides response status code on request failure

2023-03-02 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13702:
--
Fix Version/s: 3.3.3

> Connect RestClient overrides response status code on request failure
> 
>
> Key: KAFKA-13702
> URL: https://issues.apache.org/jira/browse/KAFKA-13702
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> In case the submitted request status is >=400, the connect RestClient 
> [throws|https://github.com/apache/kafka/blob/8047ba3800436d6162d0f8eb707e28857ab9eb68/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L133]
>  a ConnectRestException with the proper response code, but it gets 
> intercepted and [rethrown with 500 status 
> code|https://github.com/apache/kafka/blob/8047ba3800436d6162d0f8eb707e28857ab9eb68/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java#L147],
>  effectively overriding the actual failure status. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 opened a new pull request, #13326: KAFKA-14774 the removed listeners should not be reconfigurable

2023-03-02 Thread via GitHub


chia7712 opened a new pull request, #13326:
URL: https://github.com/apache/kafka/pull/13326

   Users can alter broker configuration to remove specify listeners. However, 
the removed listeners are NOT removed from `reconfigurables` list. It can 
result in the idle processors if users increases the network threads 
subsequently.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1123292197


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##
@@ -111,20 +115,62 @@ public enum TestPlugin {
 /**
  * A plugin which shares a jar file with {@link 
TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE}
  */
-MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo");
+MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo"),
+/**
+ * A plugin which is incorrectly packaged, and is missing a superclass 
definition.
+ */
+FAIL_TO_INITIALIZE_MISSING_SUPERCLASS("fail-to-initialize", 
"test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
+/**
+ * A plugin which is packaged with other incorrectly packaged plugins, 
but itself has no issues loading.
+ */
+FAIL_TO_INITIALIZE_CO_LOCATED("fail-to-initialize", 
"test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER),
+/**
+ * A connector which is incorrectly packaged, and throws during static 
initialization.
+ */
+
FAIL_TO_INITIALIZE_STATIC_INITIALIZER_THROWS_CONNECTOR("fail-to-initialize", 
"test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER),
+/**
+ * A plugin which is incorrectly packaged, which throws an exception 
from the {@link Versioned#version()} method.
+ */
+
FAIL_TO_INITIALIZE_VERSION_METHOD_THROWS_CONNECTOR("fail-to-initialize", 
"test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER),

Review Comment:
   I think `BAD_PACKAGING` is probably the best here.
   
   I want to make sure that whoever reads these tests in the future isn't 
actively misled by anything in them. It's less important that the 
descriptions/variable names perfectly capture intent, and more important that 
they don't imply something incorrect.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -439,6 +457,22 @@ public static  String versionFor(Class 
pluginKlass) throws Refle
 versionFor(pluginKlass.getDeclaredConstructor().newInstance()) : 
UNDEFINED_VERSION;
 }
 
+private static String reflectiveErrorDescription(Throwable t) {
+if (t instanceof NoSuchMethodException) {
+return ": Plugin class must have a default constructor, and cannot 
be a non-static inner class";
+} else if (t instanceof SecurityException) {
+return ": Security settings must allow reflective instantiation of 
plugin classes";
+} else if (t instanceof IllegalAccessException) {
+return ": Plugin class default constructor must be public";
+} else if (t instanceof ExceptionInInitializerError) {
+return ": Plugin class should not throw exception during static 
initialization";
+} else if (t instanceof InvocationTargetException) {
+return ": Constructor must complete without throwing an exception";
+} else {
+return "";

Review Comment:
   This is fine, thanks 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1123292638


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -419,7 +423,14 @@ private  Collection> 
getServiceLoaderPluginDesc(Class klass,
 Collection> result = new ArrayList<>();
 try {
 ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (T pluginImpl : serviceLoader) {
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Unable to instantiate plugin{}", 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   Works for me 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13215: KAFKA-14578: Move ConsumerPerformance to tools

2023-03-02 Thread via GitHub


mimaison commented on code in PR #13215:
URL: https://github.com/apache/kafka/pull/13215#discussion_r1123250142


##
tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java:
##
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static joptsimple.util.RegexMatcher.regex;
+
+public class ConsumerPerformance {
+private static final Random RND = new Random();
+
+public static void main(String[] args) {
+try {
+ConsumerPerfOptions options = new ConsumerPerfOptions(args);
+AtomicLong totalMessagesRead = new AtomicLong(0), totalBytesRead = 
new AtomicLong(0),
+joinTimeMs = new AtomicLong(0), joinTimeMsInSingleRound = new 
AtomicLong(0);
+
+if (!options.hideHeader())
+printHeader(options.showDetailedStats());
+
+KafkaConsumer consumer = new 
KafkaConsumer<>(options.props());
+long bytesRead = 0L, messagesRead = 0L, lastBytesRead = 0L, 
lastMessagesRead = 0L;
+long currentTimeMs = System.currentTimeMillis();
+long joinStartMs = currentTimeMs;
+long startMs = currentTimeMs;
+consume(consumer, options, totalMessagesRead, totalBytesRead, 
joinTimeMs,
+bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+joinStartMs, joinTimeMsInSingleRound);
+long endMs = System.currentTimeMillis();
+
+Map metrics = null;
+if (options.printMetrics())
+metrics = consumer.metrics();
+consumer.close();
+
+// print final stats
+double elapsedSec = (endMs - startMs) / 1_000.0;
+long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
+if (!options.showDetailedStats()) {
+double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 
1024);
+System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, 
%.4f%n",
+options.dateFormat().format(startMs),
+options.dateFormat().format(endMs),
+totalMbRead,
+totalMbRead / elapsedSec,
+totalMessagesRead.get(),
+totalMessagesRead.get() / elapsedSec,
+joinTimeMs.get(),
+fetchTimeInMs,
+totalMbRead / (fetchTimeInMs / 1000.0),
+totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+);
+}
+
+if (metrics != null)
+ToolsUtils.printMetrics(metrics);
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+Exit.exit(1);
+}
+}
+
+protected static void printHeader(boolean showDetailedStats) {
+String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, 
fetch.MB.sec, fetch.nMsg.sec";
+if (!showDetailedStats)
+System.out.printf("start.time, end.t

[jira] [Assigned] (KAFKA-14487) Move LogManager to storage module

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14487:
-

Assignee: (was: Sagar Rao)

> Move LogManager to storage module
> -
>
> Key: KAFKA-14487
> URL: https://issues.apache.org/jira/browse/KAFKA-14487
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14585) Move StorageTool to tools

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14585:
-

Assignee: (was: Sagar Rao)

> Move StorageTool to tools
> -
>
> Key: KAFKA-14585
> URL: https://issues.apache.org/jira/browse/KAFKA-14585
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123318824


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   Ah, I misunderstood--I thought you'd found a counterexample that couldn't be 
covered by the minimal set of `isolate`/`isolateV` variants, instead of one 
that couldn't be covered by the expanded set of variants originally present in 
the PR. Was really scratching my head over that one!
   
   Thanks for the changes, this class LGTM now 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13555) Consider number if input topic partitions for task assignment

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13555:
-

Assignee: (was: Sagar Rao)

> Consider number if input topic partitions for task assignment
> -
>
> Key: KAFKA-13555
> URL: https://issues.apache.org/jira/browse/KAFKA-13555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> StreamsAssignor tries to distribute tasks evenly across all instances/threads 
> of a Kafka Streams application. It knows about instances/thread (to give more 
> capacity to instances with more thread), and it distinguishes between 
> stateless and stateful tasks. We also try to not move state around but to use 
> a sticky assignment if possible. However, the assignment does not take the 
> number of input topic partitions into account.
> For example, an upstream tasks could compute two joins, and thus has 3 input 
> partitions, while a downstream task compute a follow up aggregation with a 
> single input partitions (from the repartition topic). It could happen that 
> one thread gets the 3 input partition tasks assigned, while the other thread 
> get the single input partition tasks assigned resulting to an uneven 
> partition assignment across both threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13349:
-

Assignee: (was: Sagar Rao)

> Allow Iterator.remove on KeyValueIterator
> -
>
> Key: KAFKA-13349
> URL: https://issues.apache.org/jira/browse/KAFKA-13349
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie++
>
> Today Stream's state store's range iterator does not support `remove`. We 
> could consider adding such support for all the built-in state stores:
> * RocksDB's native iterator does not support removal, but we can always do a 
> delete(key) concurrently while the iterator is open on the snapshot.
> * In-Memory: straight forward implementation.
> The benefit of that is then for range-and-delete truncation operation we do 
> not necessarily have to be cautious about concurrent modification exceptions. 
> This could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13336) Migrate StreamsBuilder/Topology class to interfaces and move Topology parameter from KafkaStreams constructor to #start

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13336:
-

Assignee: (was: Sagar Rao)

> Migrate StreamsBuilder/Topology class to interfaces and move Topology 
> parameter from KafkaStreams constructor to #start
> ---
>
> Key: KAFKA-13336
> URL: https://issues.apache.org/jira/browse/KAFKA-13336
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> In order to refactor and improve the streams physical plan generation, we'll 
> need to clean up the DSL builder API a bit and in particular enforce the 
> configs be passed in from the beginning, rather than only when calling 
> #build. We can also use this opportunity to improve the disconnect between 
> the builder, the resulting Topology, and the Kafka Streams application that 
> ultimately runs this topology – at the moment these are all completely 
> uncoupled on the surface, so it's easy to think that a StreamsBuilder can be 
> reused to build multiple Topology objects, or that a Topology object could be 
> passed in to different KafkaStreams. However there is internal state that is 
> shared and modified during StreamsBuilder#build and in the KafkaStreams 
> constructor, and they are actually very coupled under the hood meaning there 
> must be a 1:1:1 ratio of builder to topology to KafkaStreams. So we need a 
> new API that
>  # Forces users to pass in the configs (Properties) when constructing the 
> builder
>  # Clarifies the relationship of the builder object to the topology, and to 
> the app itself
> I think a good API for this might look something like this:
>  # Move the StreamsBuilder class to an internal one (technically we would 
> need to keep it where it is for now until a full deprecation cycle)
>  # Introduce a TopologyBuilder interface to replace the functionality of the 
> current StreamsBuilder class, and have StreamsBuilder implement this. All the 
> current methods on StreamsBuilder will be moved to the TopologyBuilder 
> interfaces
>  # Move the Topology parameter out of the KafkaStreams constructor, and into 
> the KafkaStreams#start method, so you can construct a KafkaStreams object 
> before the Topology
>  # Add a factory method on KafkaStreams for users to get instances of the 
> TopologyBuilder, and have this accept a Properties. For example
> {code:java}
> class KafkaStreams {
> public void newTopologyBuilder(final Properties props) {
> // convert to StreamsConfig to validate configs & check for 
> application.id
> final StreamsConfig config = new StreamsConfig(props); 
> return new StreamsBuilder(config);
> }
> }{code}
> This should satisfy both of the requirements, and imo provides a cleaner API 
> anyways. Getting the builder through a factory method on the KafkaStreams 
> object should make it clear that this builder is tied to that particular 
> KafkaStreams instance. And we can enforce that it isn't reused for a 
> different application by parsing the Properties passed in to 
> KafkaStreams#newTopologyBuilder, specifically the application.id. It also 
> leads to a more natural process of writing a Kafka Streams app: start with 
> the KafkaStreams object and global configs, then use this to build up the 
> processing topology. Looking forward, this will better complement the new 
> named topologies feature, with an API that treats topologies as entities 
> attached to a particular KafkaStreams but that may come and go



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12957) Refactor Streams Logical Plan Generation

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-12957:
-

Assignee: (was: Sagar Rao)

> Refactor Streams Logical Plan Generation
> 
>
> Key: KAFKA-12957
> URL: https://issues.apache.org/jira/browse/KAFKA-12957
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> There is a general issue of Streams logical plan -> physical plan generation, 
> where the physical processor nodes are generated at the parsing phase rather 
> than the logical plan compilation phase. The former stage is agnostic to any 
> user configurations while only the latter stage have access to it, and hence 
> we would not generate physical processor nodes during the parsing phase (i.e. 
> any code related to StreamsBuilder), but defer them to the logical plan phase 
> (i.e. XXNode.writeToTopology). This has several issues such that many 
> physical processor instantiation requires to access the configs, and hence we 
> have to defer it to the `init` procedure of the node, which is scattered in 
> many places from logical nodes to physical processors.
> This would be a big refactoring on Stream's logical plan generation, but I 
> think it would worth to get this in a cleaner state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13499) Avoid restoring outdated records

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13499:
-

Assignee: (was: Sagar Rao)

> Avoid restoring outdated records
> 
>
> Key: KAFKA-13499
> URL: https://issues.apache.org/jira/browse/KAFKA-13499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Kafka Streams has the config `windowstore.changelog.additional.retention.ms` 
> to allow for an increase retention time.
> While an increase retention time can be useful, it can also lead to 
> unnecessary restore cost, especially for stream-stream joins. Assume a 
> stream-stream join with 1h window size and a grace period of 1h. For this 
> case, we only need 2h of data to restore. If we lag, the 
> `windowstore.changelog.additional.retention.ms` helps to prevent the broker 
> from truncating data too early. However, if we don't lag and we need to 
> restore, we restore everything from the changelog.
> Instead of doing a seek-to-beginning, we could use the timestamp index to 
> seek the first offset older than the 2h "window" of data that we need to 
> restore, to avoid unnecessary work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13501) Avoid state restore via rebalance if standbys are enabled

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13501:
-

Assignee: (was: Sagar Rao)

> Avoid state restore via rebalance if standbys are enabled
> -
>
> Key: KAFKA-13501
> URL: https://issues.apache.org/jira/browse/KAFKA-13501
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> There are certain scenario in which Kafka Streams wipes out local state and 
> rebuilt it from scratch. This is a thread local cleanup, ie, no rebalance is 
> triggered, and we end up with an offline task until state restoration 
> finished.
> If standby tasks are enable, it might actually make sense to trigger a 
> rebalance instead, to get the task re-assigned to the instance hosting the 
> standby so get the task active again quickly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-03-02 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-12550:
-

Assignee: (was: Sagar Rao)

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   This is a change in behavior too, right? We no longer throw in 
`ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and 
distributed modes), but it does has consequences for the REST API, where 
restarting a connector no longer fails if we're unable to generate task configs 
for it (which is currently the case for both distributed and standalone modes).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2023-03-02 Thread via GitHub


urbandan commented on PR #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-1452111290

   @C0urante I'm also hesitant to change the API, but I think this part of the 
API is broken. If we tweak ConnectSchema::equals to use the Schema interface 
methods, then setting the default value on the SchemaBuilder will work, but 
then Struct::equals will stop working on default values.
   Assume that ConnectSchema::equals uses the Schema methods, and we write this 
code:
   
   `
   SchemaBuilder builder = SchemaBuilder.struct()
   .field("f1", Schema.BOOLEAN_SCHEMA);
   Struct defaultValue = new Struct(builder);
   defaultValue.put("f1", true);
   
   Schema finalSchema = builder.defaultValue(defaultValue).build();
   Struct anotherValue = new Struct(finalSchema);
   anotherValue.put("f1", true);
   
   assertEquals(defaultValue, anotherValue);
   assertEquals(anotherValue, defaultValue);
   `
   The first assert fails (defaultValue has a SchemaBuilder instance as the 
schema, which doesn't have an equals override, so 
defaultValue.schema.equals(anotherValue.schema) is false).
   The second assert never exits, it's infinite recursion (anotherValue has a 
ConnectSchema instance as the schema, which checks the default value for 
equality, which then checks the schema equality, and so on).
   
   To me, the tweak doesn't seem like a proper fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1123045997


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+pr

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1123369985


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {

Review Comment:
   Right, it was not a deliberate change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13327: MINOR: tweak the doc of "num.network.threads"

2023-03-02 Thread via GitHub


chia7712 opened a new pull request, #13327:
URL: https://github.com/apache/kafka/pull/13327

   send readers a reminder: the total number of network threads is `listeners * 
num.network.threads`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-03-02 Thread Koma Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695817#comment-17695817
 ] 

Koma Zhang commented on KAFKA-14747:


Thank u guys, so for me as the developer, which branch i can use to checkout 
the development branch for this task? [~mjsax] 

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   This is a change in behavior too, right? We no longer throw in 
`ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and 
distributed modes), but it does have consequences for the REST API, where 
restarting a connector no longer fails if we're unable to generate task configs 
for it (which is currently the case for both distributed and standalone modes).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123467255


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   Okay, a lot to unpack here!
   
   The more I think about it, the more I like the existing behavior for 
handling failures in task config generation. We automatically retry in 
distributed mode in order to absorb the risk of writing to the config topic or 
issuing a REST request to the leader, but since neither of those take place in 
standalone mode, it's fine to just throw the exception back to the caller 
(either a connector invoking `ConnectorContext::requestTaskReconfiguration`, or 
a REST API call to restart the connector) since the likeliest cause is a failed 
call to `Connector::taskConfigs` and automatic retries are less likely to be 
useful.
   
   I think we should basically just preserve existing behavior here, with the 
one exception of fixing how we handle failed calls to 
`requestTaskReconfiguration`  that occur during a call to `restartConnector`. 
Right now we don't handle any of those and, IIUC, just cause the REST request 
to time out after 90 seconds. Instead of timing out, we should return a 500 
response in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123470094


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   I don't think it's especially likely for connectors to continually invoke 
`requestTaskReconfiguration` given the automatic retry logic in distributed 
mode, and as of https://github.com/apache/kafka/pull/13276, the impact of 
ongoing retries for that operation is drastically reduced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1123469847


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -936,13 +937,13 @@ private CompletableFuture 
handleFetchRequest(
 RaftRequest.Inbound requestMetadata,
 long currentTimeMs
 ) {
-FetchRequestData request = (FetchRequestData) requestMetadata.data;
+FetchRequest request = new FetchRequest((FetchRequestData) 
requestMetadata.data, requestMetadata.apiVersion);

Review Comment:
   The FetchRequestData can have versions 14 and 15. The main problem is that 
KafakRaftClient needs the ReplicaId which is not at the field of 
FetchRequestData. We need the API version to properly locate the ReplicaId.



##
raft/src/main/java/org/apache/kafka/raft/RaftRequest.java:
##
@@ -45,11 +45,14 @@ public long createdTimeMs() {
 return createdTimeMs;
 }
 
+
 public static class Inbound extends RaftRequest {
 public final CompletableFuture completion = new 
CompletableFuture<>();
+public final short apiVersion;
 
-public Inbound(int correlationId, ApiMessage data, long createdTimeMs) 
{
+public Inbound(int correlationId, ApiMessage data, long createdTimeMs, 
short apiVertion) {

Review Comment:
   Used to store the API version of the FetchRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-02 Thread via GitHub


satishd commented on PR #13275:
URL: https://github.com/apache/kafka/pull/13275#issuecomment-1452242033

   Thanks @junrao for your review. Addressed your comments inline and/or 
updated with the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13328: KAFKA-14745: Cache the ReplicationPolicy instance in MirrorConnectorC…

2023-03-02 Thread via GitHub


mimaison opened a new pull request, #13328:
URL: https://github.com/apache/kafka/pull/13328

   …onfig
   
   Otherwise calls to `checkpointsTopic()`, whcih happen relatively frequently, 
keep creating new ReplicationPolicy instances. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1123475771


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -302,6 +315,18 @@ public String toString() {
 }
 }
 
+public static void updateFetchRequestDataReplicaState(FetchRequestData 
fetchRequestData, int replicaId, long replicaEpoch, short version) {
+if (version < 15) {
+fetchRequestData.setReplicaId(replicaId);
+fetchRequestData.setReplicaState(new ReplicaState());
+} else {
+fetchRequestData.setReplicaId(new FetchRequestData().replicaId());

Review Comment:
   Actually, the default value is 0, I can make a change to have it -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


CalvinConfluent commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1123476762


##
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##
@@ -337,8 +362,27 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 .setResponses(topicResponseList));
 }
 
+public String clusterId() {
+return data.clusterId();
+}
+
+public List topics() {
+return data.topics();
+}
+
+public int maxWaitMs() {
+return data.maxWaitMs();
+}

Review Comment:
   It is only used by KafkaRaftClient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


CalvinConfluent commented on PR #13323:
URL: https://github.com/apache/kafka/pull/13323#issuecomment-1452256725

   @dajac I think the major question is how can the KafkaRaftClient can 
correctly consume the FetchRequest.
   The current PR makes quite some efforts to get the FetchRequest apiVersion 
and then parse the FetchRequest.
   If this way is too much trouble, I can think of another one:
   1. Change the FetchRequest old ReplicaId default value to -1.
   2. Then when the KafkaRaftClient receives the request, it can tell the 
ReplicaId by comparing the value of the old ReplicaId field and the new one(it 
has default value -1).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123499195


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !a

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123501064


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1303,11 +1305,13 @@ public void 
testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr
 Map errors = new HashMap<>();
 errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
 errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
+AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
+AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
 client.respond(body -> {
 AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) 
body;
-assertEquals(new HashSet<>(request.partitions()), new 
HashSet<>(errors.keySet()));
+assertEquals(new 
HashSet<>(AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics())),
 new HashSet<>(errors.keySet()));

Review Comment:
   I can take a look. I also used internally to build the request and I think 
the v4+ version in KafkaApis itself. I can try to make a helper for these tests 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13323: KAFKA-14617; Add ReplicaState to FetchRequest.

2023-03-02 Thread via GitHub


dajac commented on PR #13323:
URL: https://github.com/apache/kafka/pull/13323#issuecomment-1452269739

   @CalvinConfluent Thanks for the explanation. I agree that we have two 
options on the table: 1) pass the api version or even the header object; or 2) 
rely on the default sentinel value to read the correct field. Personally, I 
lean towards 2) here because it is simpler. It would be great to run this by 
@hachikuji as the authored this part.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123503881


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   To clarify -- I'm not sure if being clear on how we want to use it (ie the 
errors it is used for) is the same question as if we want to include the error 
on the txns as well. 
   
   As for the latter, I think we are more error prone if we set the error to 
none. I guess I'm just not sure what we are trying to accomplish by removing 
them from the lower fields. To be clear, I think I've also seen this pattern on 
other responses with top level errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9234) Consider using @Nullable and @Nonnull annotations

2023-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695846#comment-17695846
 ] 

Matthias J. Sax commented on KAFKA-9234:


 [~rndgstn] did look into the PRs - maybe he knows best? – In general, yes, 
contributions are welcome to address this.

> Consider using @Nullable and @Nonnull annotations
> -
>
> Key: KAFKA-9234
> URL: https://issues.apache.org/jira/browse/KAFKA-9234
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, KafkaConnect, producer , 
> streams, streams-test-utils
>Reporter: Matthias J. Sax
>Assignee: Manasvi Gupta
>Priority: Minor
>  Labels: beginner, newbie
>
> Java7 was dropped some time ago, and we might want to consider usein Java8 
> `@Nullable` and `@Nonnull` annotations for all public facing APIs instead of 
> documenting it in JavaDocs only.
> This tickets should be broken down in a series of smaller PRs to keep the 
> scope of each PR contained, allowing for more effective reviews.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123508496


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1328,7 +1328,7 @@ Priority priority() {
 @Override
 public void handleResponse(AbstractResponse response) {
 AddPartitionsToTxnResponse addPartitionsToTxnResponse = 
(AddPartitionsToTxnResponse) response;
-Map errors = 
addPartitionsToTxnResponse.errors();
+Map errors = 
addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);

Review Comment:
   I was told not to have v3 and below specific methods from Jason because the 
v3 case should generalize to a single version of the v4 case and that should 
make it easy to use methods for both.
   
   However, if we really think this is an issue. I guess we can change the 
approach again. I'm just not sure the experience of errors only applying to 
v4+. Any ideas there besides changing the method name to express it should only 
be used in v4+?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-03-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695848#comment-17695848
 ] 

Matthias J. Sax commented on KAFKA-14747:
-

You would create a branch from `trunk` – for more info, read the "Developer 
Info" on the Kafka web page: [https://kafka.apache.org/project]

If course, if you have more question, just let us know.

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1123513945


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 
+  /** New group coordinator configs */
+  // All properties are kept internal until KIP-848 is released.
+  // This property meant to be here only during the development of 
KIP-848. It will
+  // be replaced by a metadata version before releasing it.
+  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc)

Review Comment:
   ah interesting. Thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123514453


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   I was looking at the AlterPartition API as a reference. There, when we have 
a top level error, we don't set the partitions in the response at all. We 
basically save space. See 
[here](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java#L46).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2023-03-02 Thread via GitHub


C0urante commented on PR #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-1452296446

   @urbandan I've given the "tweak" fix (i.e., altering `ConnectSchema::equals` 
to use interface methods and accept any `Schema` instance during equality 
checking) a shot locally and, although a small additional change is necessary 
for `assertEquals(defaultValue, anotherValue)` to pass, there was no infinite 
loop during `assertEquals(anotherValue, defaultValue)` (which I suspect boiled 
down to the fact that, eventually, we performed an equality check for two 
schemas that were referentially equal and thus were able to short-circuit the 
rest of the `equals` method).
   
   My `ConnectSchema::equals` method looked like this:
   
   ```java
   public class ConnectSchema {
   @Override
   public boolean equals(Object o) {
   if (this == o) return true;
   if (!(o instanceof Schema)) return false;
   Schema schema = (Schema) o;
   boolean matches = Objects.equals(this.isOptional(), 
schema.isOptional()) &&
   Objects.equals(this.version(), schema.version()) &&
   Objects.equals(this.name(), schema.name()) &&
   Objects.equals(this.doc(), schema.doc()) &&
   Objects.equals(this.type(), schema.type()) &&
   Objects.equals(this.parameters(), schema.parameters()) &&
   Objects.deepEquals(this.defaultValue(), 
schema.defaultValue());
   if (!matches)
   return false;
   
   switch (type) {
   case ARRAY:
   return Objects.equals(this.valueSchema(), 
schema.valueSchema());
   case MAP:
   return Objects.equals(this.keySchema(), schema.keySchema())
   && Objects.equals(this.valueSchema(), 
schema.valueSchema());
   case STRUCT:
   return Objects.equals(this.fields, schema.fields());
   default:
   return true;
   }
   }
   }
   ```
   
   And the only additional change that was necessary to make 
`assertEquals(defaultValue, anotherValue)` pass was implement 
`SchemaBuilder::equals`:
   
   ```java
   public class SchemaBuilder {
   @Override
   public boolean equals(Object o) {
   return this == o || build().equals(o);
   }
   
   @Override
   public int hashCode() {
   return build().hashCode();
   }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123517021


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1303,11 +1305,13 @@ public void 
testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr
 Map errors = new HashMap<>();
 errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
 errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
+AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
+AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
 client.respond(body -> {
 AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) 
body;
-assertEquals(new HashSet<>(request.partitions()), new 
HashSet<>(errors.keySet()));
+assertEquals(new 
HashSet<>(AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics())),
 new HashSet<>(errors.keySet()));

Review Comment:
   If those are used internally the request as well, we can't really move them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14746) Throwing in Connector.taskConfigs generates a lot of logs

2023-03-02 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695852#comment-17695852
 ] 

Mickael Maison commented on KAFKA-14746:


https://issues.apache.org/jira/browse/KAFKA-14732 describes pretty much the 
same issue. With the exponential backoff we don't spam the logs with errors. 
However it's still kind of unclear what's going on from a user point of view 
without looking at the logs.

If the connector throws in taskConfigs()
* If it's a reconfiguration, the existing tasks keep running (connector and 
tasks show no errors in the REST API)
* If it's the creation of a new connector, the connector is marked as running 
and no tasks are created.

In both cases, the runtime retries reconfiguring it forever. 

> Throwing in Connector.taskConfigs generates a lot of logs
> -
>
> Key: KAFKA-14746
> URL: https://issues.apache.org/jira/browse/KAFKA-14746
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Priority: Major
>
> If a Connector throws in its taskConfigs() method, the runtime ends up 
> retrying using DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS which 
> is a fixed value (250ms). For each retry, the runtime prints the connector 
> configuration and the enriched configuration so this can quickly generate a 
> lot of logs.
> There is some value in throwing in taskConfigs() as it allows to fail fast in 
> case the connector is given bad credentials. For example this is what some of 
> the Debezium connectors do: 
> https://github.com/debezium/debezium/blob/main/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L56-L69
> The way Connectors are expected to work today is to instead always create 
> tasks and let each task fail in case the configuration is wrong. We should 
> document that and make it clear in the javadoc that throwing in taskConfigs 
> is not recommended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123526676


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   Ok.
   I will remove the errors from the response
   When using the new API it is mandatory I check the top level error before 
any further handling. 
   I may also need to adjust tests/test this functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123527995


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1328,7 +1328,7 @@ Priority priority() {
 @Override
 public void handleResponse(AbstractResponse response) {
 AddPartitionsToTxnResponse addPartitionsToTxnResponse = 
(AddPartitionsToTxnResponse) response;
-Map errors = 
addPartitionsToTxnResponse.errors();
+Map errors = 
addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);

Review Comment:
   Understood. Let's keep it as it is then.
   
   I agree that v3 case should generalized to a single item of the v4 case. It 
is just unfortunate that we don't have the transaction id in v3 response so we 
have to use an empty string for it. I suppose that it is the way it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123529173


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   Don't forget to take the version into account here. We can only do this for 
v4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

2023-03-02 Thread via GitHub


guozhangwang commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1123543033


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record 
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+private final Metrics metrics;
+private final FetcherMetricsRegistry metricsRegistry;
+private final Sensor bytesFetched;
+private final Sensor recordsFetched;
+private final Sensor fetchLatency;
+private final Sensor recordsFetchLag;
+private final Sensor recordsFetchLead;
+
+private int assignmentId = 0;
+private Set assignedPartitions = Collections.emptySet();

Review Comment:
   Yeah something like that (though we need to check if all metric names did 
include the partition in the same format), instead of trying to remember all 
the prev assigned partitions instead. The idea is that since now the assignment 
could be updated by different threads, we may lose track of the full history of 
`assigned partitions` and hence would cause some dangling metrics in the 
registry never to be removed, or some metrics unnecessarily removed, by just 
relying on the new assignment and always loop over all metrics to trim them 
seems safer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1123546376


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -322,6 +324,29 @@ public void run() {
 }
 }
 
+/**
+ * Validates that the underlying connectors and tasks are running.
+ * Visible for testing purpose.
+ */
+boolean isConfiguredAndRunning() {
+return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+final String connName = connectorClazz.getSimpleName();
+return herderPairs.stream().allMatch(sourceAndTarget -> {
+final ConnectorStateInfo connectorStatus = 
this.connectorStatus(sourceAndTarget, connName);
+return connectorStatus != null
+// verify that connector state is set to running
+&& 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+// verify that all tasks are set to running
+&& connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   This doesn't actually verify that tasks are running (i.e., there could be an 
empty set of tasks for a connector and this condition would still be true).
   
   We probably want to either 1) require that at least one task exists or 2) 
require the user to specify the expected number of tasks for each connector, in 
addition to this check (which ensures that every task that does exist is in the 
`RUNNING` state).



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -322,6 +324,29 @@ public void run() {
 }
 }
 
+/**
+ * Validates that the underlying connectors and tasks are running.
+ * Visible for testing purpose.
+ */
+boolean isConfiguredAndRunning() {
+return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+final String connName = connectorClazz.getSimpleName();
+return herderPairs.stream().allMatch(sourceAndTarget -> {
+final ConnectorStateInfo connectorStatus = 
this.connectorStatus(sourceAndTarget, connName);
+return connectorStatus != null
+// verify that connector state is set to running
+&& 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+// verify that all tasks are set to running
+&& connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+});
+});
+}

Review Comment:
   We should expose the smallest testing-only APIs necessary in non-testing 
code, and implement other logic in testing-only places.
   
   So in this case, the API we'd expose here would be the `connectorStatus` 
method, and the logic in this method would go in, e.g., the 
`DedicatedMirrorIntegrationTest` suite.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##
@@ -62,8 +60,10 @@ public void setup() {
 @AfterEach
 public void teardown() throws Throwable {
 AtomicReference shutdownFailure = new AtomicReference<>();
-mirrorMakers.forEach((name, mirrorMaker) ->
-Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+mirrorMakers.forEach((name, mirrorMaker) -> {
+Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure);
+mirrorMaker.awaitStop();
+}

Review Comment:
   We can optimize a little here, which will hopefully reduce the impact this 
change has on test time when running multiple MM2 nodes:
   ```suggestion
   mirrorMakers.forEach((name, mirrorMaker) -> {
   Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" 
+ name + "'", shutdownFailure);
   }
   );
   mirrorMakers.forEach((name, mirrorMaker) -> {
   mirrorMaker.awaitStop();
   }
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##
@@ -322,6 +324,29 @@ public void run() {
 }
 }
 
+/**
+ * Validates that the underlying connectors and tasks are running.
+ * Visible for testing purpose.
+ */
+boolean isConfiguredAndRunning() {
+return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+final String connName = connectorClazz.getSimpleName();
+return herderPairs.stream().allMatch(sourceAndTarget -> {
+final ConnectorStateInfo connectorStatus = 
this.connectorStatus(sourceAndTarget, connName);
+return connectorStatus != null
+// verify that connector 

[jira] [Commented] (KAFKA-14773) Make MirrorMaker startup synchronous

2023-03-02 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695862#comment-17695862
 ] 

Chris Egerton commented on KAFKA-14773:
---

The shell script encompasses the entire lifetime of MM2; I don't think this 
really captures the problems that were discussed on the PR.

The general problem is that it's difficult to know when connectors/tasks 
created by MM2 have failed. It doesn't really matter whether this happens on 
startup or later in their lifetime. The semantics of the existing {{main}} 
method are fine unless we want to take the (somewhat nuclear) route of failing 
the entire process if there's an issue with creating one or more {{Connector}} 
or {{Task}} instances, which isn't really what I had in mind.

> Make MirrorMaker startup synchronous
> 
>
> Key: KAFKA-14773
> URL: https://issues.apache.org/jira/browse/KAFKA-14773
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: mirror-maker
>
> Currently, MirrorMaker is started using `
> ./bin/connect-mirror-maker.sh mm2.properties` shell command. However, even if 
> the shell command has exited and a log with `Kafka MirrorMaker started` has 
> been printed, it is likely that the underlying connectors and tasks have not 
> been configured.
> This tasks aims to make the MirrorMaker startup synchronous by either waiting 
> for connections & tasks to move to running state before exiting the 
> `MirrorMaker#start()` function or by blocking completion of `main()`.
> A conversation about this was done in 
> [https://github.com/apache/kafka/pull/13284] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14773) Make MirrorMaker startup synchronous

2023-03-02 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695862#comment-17695862
 ] 

Chris Egerton edited comment on KAFKA-14773 at 3/2/23 6:49 PM:
---

The startup shell script encompasses the entire lifetime of MM2; I don't think 
this really captures the problems that were discussed on the PR.

The general problem is that it's difficult to know when connectors/tasks 
created by MM2 have failed. It doesn't really matter whether this happens on 
startup or later in their lifetime. The semantics of the existing {{main}} 
method are fine unless we want to take the (somewhat nuclear) route of failing 
the entire process if there's an issue with creating one or more {{Connector}} 
or {{Task}} instances, which isn't really what I had in mind.


was (Author: chrisegerton):
The shell script encompasses the entire lifetime of MM2; I don't think this 
really captures the problems that were discussed on the PR.

The general problem is that it's difficult to know when connectors/tasks 
created by MM2 have failed. It doesn't really matter whether this happens on 
startup or later in their lifetime. The semantics of the existing {{main}} 
method are fine unless we want to take the (somewhat nuclear) route of failing 
the entire process if there's an issue with creating one or more {{Connector}} 
or {{Task}} instances, which isn't really what I had in mind.

> Make MirrorMaker startup synchronous
> 
>
> Key: KAFKA-14773
> URL: https://issues.apache.org/jira/browse/KAFKA-14773
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: mirror-maker
>
> Currently, MirrorMaker is started using `
> ./bin/connect-mirror-maker.sh mm2.properties` shell command. However, even if 
> the shell command has exited and a log with `Kafka MirrorMaker started` has 
> been printed, it is likely that the underlying connectors and tasks have not 
> been configured.
> This tasks aims to make the MirrorMaker startup synchronous by either waiting 
> for connections & tasks to move to running state before exiting the 
> `MirrorMaker#start()` function or by blocking completion of `main()`.
> A conversation about this was done in 
> [https://github.com/apache/kafka/pull/13284] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-03-02 Thread via GitHub


junrao commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1123584048


##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java:
##
@@ -44,6 +45,28 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
 this.abortedTransactions = abortedTransactions;
 }
 
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+FetchDataInfo that = (FetchDataInfo) o;
+
+return firstEntryIncomplete != that.firstEntryIncomplete &&
+Objects.equals(fetchOffsetMetadata, that.fetchOffsetMetadata) 
&&
+Objects.equals(records, that.records) &&

Review Comment:
   Hmm, FileRecords doesn't implement `equals`. So, maybe LogReadInfo should 
never be used for comparison?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-02 Thread via GitHub


dajac merged PR #13322:
URL: https://github.com/apache/kafka/pull/13322


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13329: KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface

2023-03-02 Thread via GitHub


dajac opened a new pull request, #13329:
URL: https://github.com/apache/kafka/pull/13329

   This patch adds ConsumerGroupHeartbeat to the GroupCoordinator interface and 
implements the API in KafkaApis.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13318: [DO NOT MERGE] KAFKA-14533: Re-enable state-updater in SmokeTestDriverIntegrationTest

2023-03-02 Thread via GitHub


guozhangwang commented on code in PR #13318:
URL: https://github.com/apache/kafka/pull/13318#discussion_r1123632054


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##
@@ -143,6 +145,33 @@ public static KafkaFuture> fetchEndOf
 ).all();
 }
 
+public static ListOffsetsResult fetchEndOffsetsResult(final 
Collection partitions,

Review Comment:
   This is 2) in the description.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -685,14 +685,17 @@ private boolean populateClientStatesMap(final Map clientState
 Map allTaskEndOffsetSums;
 try {
 // Make the listOffsets request first so it can  fetch the offsets 
for non-source changelogs
-// asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets
-final KafkaFuture> 
endOffsetsFuture =
-
fetchEndOffsetsFuture(changelogTopics.preExistingNonSourceTopicBasedPartitions(),
 adminClient);
+// asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets;

Review Comment:
   This is 2) in the description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-03-02 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695888#comment-17695888
 ] 

Guozhang Wang commented on KAFKA-14533:
---

After some investigation I found the following:

1. In both successful and failed runs, the first `list-offset` request could 
fail if the topic creation is not yet completed; and that failure is actually 
not fatal since we would just fallback to the naive assignor behavior. So that 
one does not play a role here.
2. What's happening (as I found from both jenkins, as well as locally after 
about 25 runs, phew) is that the stateUpdater.shutdown(timeout) where the 
timeout value is default to `MAX.value` never completes, as the thread itself 
never exits, I have a PR (https://github.com/apache/kafka/pull/13318) that does 
not rely on interruptions to shutdown the thread. I think it could fix the 
never shutdown issue.

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-03-02 Thread via GitHub


C0urante merged PR #13182:
URL: https://github.com/apache/kafka/pull/13182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-02 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1452495333

   Thanks David for the review, have a few more tests to add but this should be 
eligible to another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-03-02 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1452503148

   This refactor first landed in #432 and then the current dependency graph was 
set by #512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] twmb commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-02 Thread via GitHub


twmb commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1452523214

   :wave: hi, seeing this after it's merged! Is the intent to create a new PR 
for making the sticky assignor rack aware? I see in the original PR #12914 that 
both range and sticky were improved. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13318: KAFKA-14533: Do not interrupt state-updater thread during shutdown

2023-03-02 Thread via GitHub


guozhangwang commented on PR #13318:
URL: https://github.com/apache/kafka/pull/13318#issuecomment-1452549269

   `SmokeTestDriverIntegrationTest` did not fail in 
`https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13318/2`, 
triggering again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13330: Prototyping Rebalance Protocol

2023-03-02 Thread via GitHub


philipnee opened a new pull request, #13330:
URL: https://github.com/apache/kafka/pull/13330

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13330: Prototyping Rebalance Protocol

2023-03-02 Thread via GitHub


philipnee commented on code in PR #13330:
URL: https://github.com/apache/kafka/pull/13330#discussion_r1123759741


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ProtocolRequestManager.java:
##
@@ -0,0 +1,96 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class ProtocolRequestManager implements RequestManager {
+GroupState groupState;
+HeartbeatRequestManager heartbeatRequestManager;
+RebalanceProtocol protocol;
+BlockingQueue eventQueue;
+Optional callbackInvokedMs;
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+switch (groupState.state) {
+case UNJOINED:
+return null;
+case PREPARE:
+return protocol.onPrepare(currentTimeMs);
+case ASSIGNING:
+invokeRebalance(currentTimeMs);
+return protocol.onAssign(currentTimeMs);
+case COMPLETE:
+return protocol.onComplete(currentTimeMs);
+case STABLE:
+return protocol.onStable(currentTimeMs);
+}
+return null;
+}
+
+public void ackCallbackInvocation(long currentTimeMs) {
+callbackInvokedMs = Optional.empty();
+groupState.state = GroupState.State.COMPLETE;
+}
+
+private void invokeRebalance(long currentTimeMs) {
+if (callbackInvokedMs.isPresent()) {
+// do nothing as we've invoked callback
+return;
+}
+callbackInvokedMs = Optional.of(currentTimeMs);
+eventQueue.add(new RebalanceCallbackEvent());
+}
+
+class RebalanceCallbackEvent extends ApplicationEvent {
+protected RebalanceCallbackEvent() {
+super(null); // it should actually accept a 
type.REBALANCE_CALLBACK_TRIGGER but i'm too lazy to create one
+}
+}
+
+class HeartbeatRequestManager implements RequestManager {

Review Comment:
   obviously not beloing here, but doing this as a prototype



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13330: Prototyping Rebalance Protocol

2023-03-02 Thread via GitHub


philipnee commented on code in PR #13330:
URL: https://github.com/apache/kafka/pull/13330#discussion_r1123760146


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ProtocolRequestManager.java:
##
@@ -0,0 +1,96 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class ProtocolRequestManager implements RequestManager {
+GroupState groupState;
+HeartbeatRequestManager heartbeatRequestManager;
+RebalanceProtocol protocol;
+BlockingQueue eventQueue;
+Optional callbackInvokedMs;
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+switch (groupState.state) {
+case UNJOINED:
+return null;
+case PREPARE:
+return protocol.onPrepare(currentTimeMs);
+case ASSIGNING:
+invokeRebalance(currentTimeMs);
+return protocol.onAssign(currentTimeMs);
+case COMPLETE:
+return protocol.onComplete(currentTimeMs);
+case STABLE:
+return protocol.onStable(currentTimeMs);
+}
+return null;
+}
+
+public void ackCallbackInvocation(long currentTimeMs) {
+callbackInvokedMs = Optional.empty();
+groupState.state = GroupState.State.COMPLETE;
+}
+
+private void invokeRebalance(long currentTimeMs) {
+if (callbackInvokedMs.isPresent()) {
+// do nothing as we've invoked callback
+return;
+}
+callbackInvokedMs = Optional.of(currentTimeMs);
+eventQueue.add(new RebalanceCallbackEvent());
+}
+
+class RebalanceCallbackEvent extends ApplicationEvent {
+protected RebalanceCallbackEvent() {
+super(null); // it should actually accept a 
type.REBALANCE_CALLBACK_TRIGGER but i'm too lazy to create one
+}
+}
+
+class HeartbeatRequestManager implements RequestManager {
+RequestState heartbeatRequestState;
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+// just a demo, don't do anything
+return null;
+}
+
+public void sendHeartbeat(long currentTimeMs) {
+heartbeatRequestState.canSendRequest(currentTimeMs);
+}
+}
+
+class ServierSideProtocol implements RebalanceProtocol {

Review Comment:
   same, just a prototype: as you can see, nothing was implemented



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123765294


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   Yup. This will only be done in v4 responses. So for now, we don't do any 
checks since the server side usage is upcoming in the next PR.
   
   As a side note, seems that AlterPartition includes the top level error in 
errorCounts so I did that here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123767340


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1328,7 +1328,7 @@ Priority priority() {
 @Override
 public void handleResponse(AbstractResponse response) {
 AddPartitionsToTxnResponse addPartitionsToTxnResponse = 
(AddPartitionsToTxnResponse) response;
-Map errors = 
addPartitionsToTxnResponse.errors();
+Map errors = 
addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);

Review Comment:
   Yeah. It really is unfortunate. 😞 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13331: MINOR: fix fault handling in ControllerServer and KafkaServer

2023-03-02 Thread via GitHub


cmccabe opened a new pull request, #13331:
URL: https://github.com/apache/kafka/pull/13331

   This PR has two fixes for fault handling. One makes fault handling more 
strict on the controller; the other makes it a bit less strict on the 
(ZK-based) broker.
   
   In ControllerServer, invoke a fatal fault handler in the catch block of the 
ControllerServer.startup() function. This will protect us from cases where 
unwinding startup would otherwise encounter a deadlock, or be too slow.  This 
is the same reason why we made controller fault handlers call halt() instead of 
exit() in KAFKA-14693.  In a sense, this JIRA is a continuation of that fix 
since it turns some cases that would previously have been handled by calling 
shutdown() into cases where we invoke a fault handler which will call halt() 
directly.
   
   In KafkaServer,  when we are in migration-from-zk mode and we create a 
RaftManager, it should have a fault handler which simply calls shutdown() on 
the broker rather than invoking
   ProcessTerminatingFaultHandler. This fixes the bug where we could invoke the 
process terminating fault handler from junit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-02 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1123774975


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1303,11 +1305,13 @@ public void 
testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr
 Map errors = new HashMap<>();
 errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
 errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
+AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
+AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
 client.respond(body -> {
 AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) 
body;
-assertEquals(new HashSet<>(request.partitions()), new 
HashSet<>(errors.keySet()));
+assertEquals(new 
HashSet<>(AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics())),
 new HashSet<>(errors.keySet()));

Review Comment:
   The one in the request is used in a place that iterates through the topic 
objects so it is actually ok. I made a helper in the TransactionManagerTest 
since it uses the phrase 3 times. I left the final remaining usage of this in 
the last test file since I think its ok to have once and doesn't make sense to 
use a helper. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-02 Thread via GitHub


mjsax merged PR #13264:
URL: https://github.com/apache/kafka/pull/13264


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-02 Thread via GitHub


vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1116012004


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java:
##
@@ -25,7 +25,7 @@
 
 /**
  * UnwindowedChangelogTopicConfig captures the properties required for 
configuring
- * the un-windowed store changelog topics.
+ * the un-windowed, un-versioned store changelog topics.

Review Comment:
   This class name `UnwindowedChangelogTopicConfig` is a bit outdated now and 
should really be `UnwindowedUnversionedChangelogTopicConfig`. A bit clunky but 
I can't think of anything better. Will rename for now and can update if others 
have suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14057) Support dynamic reconfiguration in KRaft remote controllers

2023-03-02 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-14057:


Assignee: Colin McCabe

> Support dynamic reconfiguration in KRaft remote controllers
> ---
>
> Key: KAFKA-14057
> URL: https://issues.apache.org/jira/browse/KAFKA-14057
> Project: Kafka
>  Issue Type: Task
>Reporter: Ron Dagostino
>Assignee: Colin McCabe
>Priority: Major
>
> We currently do not support dynamic reconfiguration of KRaft remote 
> controllers.  We only wire up brokers and react to metadata log changes 
> there.  We do no such wiring or reacting in a node where 
> process.roles=controller.  Related to 
> https://issues.apache.org/jira/browse/KAFKA-14051.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >