[jira] [Updated] (KAFKA-14304) ZooKeeper to KRaft Migration

2023-02-28 Thread Akhilesh Chaganti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akhilesh Chaganti updated KAFKA-14304:
--
Fix Version/s: 3.5.0

> ZooKeeper to KRaft Migration
> 
>
> Key: KAFKA-14304
> URL: https://issues.apache.org/jira/browse/KAFKA-14304
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> Top-level JIRA for 
> [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-02-28 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1121236675


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+expiredIndexes.add(entry);
+return true;
+} else {
+return false;
+}
+}
+};
+
+init();
+
+// Start cleaner thread that will clean the expired entries.
+cleanerThread = createCleanerThread();
+cleanerThread.start();
+}
+
+pr

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-02-28 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1121168324


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java:
##
@@ -31,6 +32,21 @@ public final class LogFileUtils {
  */
 public static final String DELETED_FILE_SUFFIX = ".deleted";
 
+/**
+ * Suffix of an offset index file
+ */
+public static final String INDEX_FILE_SUFFIX = ".index";

Review Comment:
   Right, that is the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils

2023-02-28 Thread via GitHub


satishd merged PR #13309:
URL: https://github.com/apache/kafka/pull/13309


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils

2023-02-28 Thread via GitHub


satishd commented on PR #13309:
URL: https://github.com/apache/kafka/pull/13309#issuecomment-1449372613

   Test failures do not seem to be related to this change, merging these 
changes to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14768) proposal to reduce the first message's send time cost and max block time for safety

2023-02-28 Thread fujian (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fujian updated KAFKA-14768:
---
Description: 
Hi, Team:

 

Nice to meet you!

 

In our business, we found two types of issue which need to improve:

 

*(1) Take much time to send the first message*

Sometimes, we found the users' functional interaction take a lot of time. At 
last, we figure out the root cause is that after we complete deploy or restart 
the servers. The first message's delivery on each application server by kafka 
client will take much time.

So, we try to find one solution to improve it.

 

After analyzing the source code about the first time's sending logic. The time 
cost is caused by the getting metadata before the sending. The latter's sending 
won't take the much time due to the cached metadata. The logic is right and 
necessary. Thus, we still want to improve the experience for the first 
message's send/user first interaction.

 

*(2) can't reduce the send message's block time to wanted value.*

Sometimes our application's thread will block for max.block.ms to send message. 
When we try to reduce the max.block.ms to reduce the blocking time. It can't 
meet the getting metadata's time requirement sometimes. The root cause is the 
configured max.block.ms is shared with "get metadata" operation and "send 
message" operation. We can refer to follow tables:
|*where to block*
 |*when it is blocked*
 |*how long it will be blocked?*
 |
|org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first 
request which need to load the metadata from kafka|https://github.com/apache/kafka/pull/13320])

 

_note:  org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata_

ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, 
long maxWaitMs)

 

 __ 

after the change, we can call it before the service is marked as ready. After 
the ready. it won't block to get metadata due to cache. And then we can be safe 
to reduce the max.block.ms to a lower value to reduce thread's blocking time.

 

After adopting the solution 3. we solve the above issues. For example, we 
reduce the first message's send about 4s seconds. The log can refer to followed:

_warmup test_topic at phase phase 2: get metadata from mq start_

_warmup test_topic at phase phase 2: get metadata from mq end consume *4669ms*_

And after the change, we reduce the max.block.ms from 10s to 2s without worry 
can't get metadata.

 

{*}So what's your thought for these two issues and the solution I proposed{*}. 
I hope to get your feedback and thought for the issues.

  was:
Hi, Team:

 

Nice to meet you!

 

In our business, we found two types of issue which need to improve:

 

*(1) Take much time to send the first message*

Sometimes, we found the users' functional interaction take a lot of time. At 
last, we figure out the root cause is that after we complete deploy or restart 
the servers. The first message's delivery on each application server by kafka 
client will take much time.

So, we try to find one solution to improve it.

 

After analyzing the source code about the first time's sending logic. The time 
cost is caused by the getting metadata before the sending. The latter's sending 
won't take the much time due to the cached metadata. The logic is right and 
necessary. Thus, we still want to improve the experience for the first 
message's send/user first interaction.

 

*(2) can't reduce the send message's block time to wanted value.*

Sometimes our application's thread will block for max.block.ms to send message. 
When we try to reduce the max.block.ms to reduce the blocking time. It can't 
meet the getting metadata's time requirement sometimes. The root cause is the 
configured max.block.ms is shared with "get metadata" operation and "send 
message" operation. We can refer to follow tables:
|*where to block*
 |*when it is blocked*
 |*how long it will be blocked?*
 |
|org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first 
request which need to load the metadata from kafka| proposal to reduce the first message's send time cost and max block time for 
> safety 
> 
>
> Key: KAFKA-14768
> URL: https://issues.apache.org/jira/browse/KAFKA-14768
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.3.1, 3.3.2
>Reporter: fujian
>Assignee: hzh0425
>Priority: Major
>  Labels: performance
>
> Hi, Team:
>  
> Nice to meet you!
>  
> In our business, we found two types of issue which need to improve:
>  
> *(1) Take much time to send the first message*
> Sometimes, we found the users' functional interaction take a lot of time. At 
> last, we figure out the root cause is that after we complete depl

[GitHub] [kafka] jiafu1115 opened a new pull request, #13320: KAFKA-14768: provide new method to warmup first record's sending and reduce the max.block.ms safely

2023-02-28 Thread via GitHub


jiafu1115 opened a new pull request, #13320:
URL: https://github.com/apache/kafka/pull/13320

   Hi Team:
   
   Refer to https://issues.apache.org/jira/browse/KAFKA-14768
   
   There are two issues for current's producer:
   (1) Take much time to send the first record;
   (2) can't reduce the send message's block time (max.block.ms) to wanted 
lower value caused by worry about the only one time's metadata fetch in 
doSend's process.
   
   Test Result:
   At the beginning of the change, we will take 4s+ to send one message and 
must make sure max.block.ms > 4s. After the change:
   (1) reduce time for first record's sending:  my example:
   warmup test_topic at phase phase 2: get metadata from mq start
   warmup test_topic at phase phase 2: get metadata from mq end consume 4669ms
   (2) can reduce max.block.ms to < 4s now.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-02-28 Thread via GitHub


showuon commented on PR #13319:
URL: https://github.com/apache/kafka/pull/13319#issuecomment-1449265831

   Thanks for investigating it! This test did fail quite frequently recently. 
But I'm not completely understanding this:
   
   > The root cause seems to be with changing the return behavior of when in 
mockito. Fixed those without using reset
   
   What does the "return bahavior change" in mockito? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-02-28 Thread via GitHub


showuon commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1121043658


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java:
##
@@ -31,6 +32,21 @@ public final class LogFileUtils {
  */
 public static final String DELETED_FILE_SUFFIX = ".deleted";
 
+/**
+ * Suffix of an offset index file
+ */
+public static final String INDEX_FILE_SUFFIX = ".index";

Review Comment:
   Looks like it'll get conflicted with another PR. But I think you'll resolve 
them later.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+this.remoteStorageManager = remoteStorageManager;
+cacheDir = new File(logDir, DIR_NAME);
+
+entries = new LinkedHashMap(maxSize, 
0.75f, true) {
+@Override
+protected boolean removeEldestEntry(Map.Entry eldest) {
+if (this.size() > maxSize) {
+RemoteIndexCache.Entry entry = eldest.getValue();
+// Mark the entries for cleanup, background thread will 
clean them later.
+try {
+entry.markForCleanup();
+} catch (IOExcep

[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-02-28 Thread via GitHub


guozhangwang commented on PR #13319:
URL: https://github.com/apache/kafka/pull/13319#issuecomment-1449254753

   ping @cadonna @lucasbru for a quick review.
   
   Before this PR, local runs fail about 50% of the time; after this PR it 
succeeds with about 10 runs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-02-28 Thread via GitHub


guozhangwang opened a new pull request, #13319:
URL: https://github.com/apache/kafka/pull/13319

   Found a few flaky tests while reviewing another PR. The root cause seems to 
be with changing the return behavior of `when` in mockito. Fixed those without 
using `reset` and also bumped a couple debug log lines to info since they could 
be very helpful in debugging.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14371) quorum-state file contains empty/unused clusterId field

2023-02-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14371.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> quorum-state file contains empty/unused clusterId field
> ---
>
> Key: KAFKA-14371
> URL: https://issues.apache.org/jira/browse/KAFKA-14371
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Assignee: Gantigmaa Selenge
>Priority: Minor
> Fix For: 3.5.0
>
>
> The KRaft controller's quorum-state file 
> `$LOG_DIR/__cluster_metadata-0/quorum-state` contains an empty clusterId 
> value.  This value is never non-empty, and it is never used after it is 
> written and then subsequently read.  This is a cosmetic issue; it would be 
> best if this value did not exist there.  The cluster ID already exists in the 
> `$LOG_DIR/meta.properties` file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-02-28 Thread via GitHub


showuon merged PR #13102:
URL: https://github.com/apache/kafka/pull/13102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-02-28 Thread via GitHub


showuon commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1449214463

   Failed tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testSingleNodeCluster()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithUniformSubscription()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testSingleNodeCluster()
   Build / JDK 17 and Scala 2.13 / 
kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart()
   Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testMigrate, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseActiveTaskAndTransitToUpdateStandby()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseActiveTaskAndTransitToUpdateStandby()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseActiveTaskAndTransitToUpdateStandby()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14768) proposal to reduce the first message's send time cost and max block time for safety

2023-02-28 Thread fujian (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fujian updated KAFKA-14768:
---
Description: 
Hi, Team:

 

Nice to meet you!

 

In our business, we found two types of issue which need to improve:

 

*(1) Take much time to send the first message*

Sometimes, we found the users' functional interaction take a lot of time. At 
last, we figure out the root cause is that after we complete deploy or restart 
the servers. The first message's delivery on each application server by kafka 
client will take much time.

So, we try to find one solution to improve it.

 

After analyzing the source code about the first time's sending logic. The time 
cost is caused by the getting metadata before the sending. The latter's sending 
won't take the much time due to the cached metadata. The logic is right and 
necessary. Thus, we still want to improve the experience for the first 
message's send/user first interaction.

 

*(2) can't reduce the send message's block time to wanted value.*

Sometimes our application's thread will block for max.block.ms to send message. 
When we try to reduce the max.block.ms to reduce the blocking time. It can't 
meet the getting metadata's time requirement sometimes. The root cause is the 
configured max.block.ms is shared with "get metadata" operation and "send 
message" operation. We can refer to follow tables:
|*where to block*
 |*when it is blocked*
 |*how long it will be blocked?*
 |
|org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first 
request which need to load the metadata from kafka| proposal to reduce the first message's send time cost and max block time for 
> safety 
> 
>
> Key: KAFKA-14768
> URL: https://issues.apache.org/jira/browse/KAFKA-14768
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.3.1, 3.3.2
>Reporter: fujian
>Assignee: hzh0425
>Priority: Major
>  Labels: performance
>
> Hi, Team:
>  
> Nice to meet you!
>  
> In our business, we found two types of issue which need to improve:
>  
> *(1) Take much time to send the first message*
> Sometimes, we found the users' functional interaction take a lot of time. At 
> last, we figure out the root cause is that after we complete deploy or 
> restart the servers. The first message's delivery on each application server 
> by kafka client will take much time.
> So, we try to find one solution to improve it.
>  
> After analyzing the source code about the first time's sending logic. The 
> time cost is caused by the getting metadata before the sending. The latter's 
> sending won't take the much time due to the cached metadata. The logic is 
> right and necessary. Thus, we still want to improve the experience for the 
> first message's send/user first interaction.
>  
> *(2) can't reduce the send message's block time to wanted value.*
> Sometimes our application's thread will block for max.block.ms to send 
> message. When we try to reduce the max.block.ms to reduce the blocking time. 
> It can't meet the getting metadata's time requirement sometimes. The root 
> cause is the configured max.block.ms is shared with "get metadata" operation 
> and "send message" operation. We can refer to follow tables:
> |*where to block*
>  |*when it is blocked*
>  |*how long it will be blocked?*
>  |
> |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first 
> request which need to load the metadata from kafka| |org.apache.kafka.clients.producer.internals.RecordAccumulator#append|at peak 
> time for business, if the network can’t send message in short 
> time.|  
> What's the solution for the above two issues:
> I think about current logic and figure out followed possible solution:
> (1) send one "warmup" message, thus we can't send any fake message.
> (2) provide one extra configure time configure which dedicated for getting 
> metadata. thus it will break the define for the max.block.ms
> (3) add one method to call waitOnMetadata with one timeout setting without 
> using the max.block.ms
>  
> _note:  org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata_
> ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long 
> nowMs, long maxWaitMs)
>  
>  __ 
> after the change, we can call it before the service is marked as ready. After 
> the ready. it won't block to get metadata due to cache. And then we can be 
> safe to reduce the max.block.ms to a lower value to reduce thread's blocking 
> time.
>  
> After adopting the solution 3. we solve the above issues. For example, we 
> reduce the first message's send about 4s seconds. The log can refer to 
> followed:
> _warmup test_topic at phase phase 2: get metadata from mq start_
> _warmup test_topic 

[jira] [Updated] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2023-02-28 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13771:
--
Fix Version/s: 3.5.0

> Support to explicitly delete delegationTokens that have expired but have not 
> been automatically cleaned up
> --
>
> Key: KAFKA-13771
> URL: https://issues.apache.org/jira/browse/KAFKA-13771
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.5.0
>
>
> Quoting the official documentation
> {quote}
> Tokens can also be cancelled explicitly. If a token is not renewed by the 
> token’s expiration time or if token is beyond the max life time, it will be 
> deleted from all broker caches as well as from zookeeper.
> {quote}
> 1. The first point above means that after the `AdminClient` initiates the 
> EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() 
> method on the KafkaServer side, if the user passes in expireLifeTimeMs less 
> than 0, KafaServer will delete the corresponding delegationToken directly.
> 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, 
> which is responsible for regularly cleaning up expired tokens. The execution 
> interval is `delegation.token.expiry.check.interval.ms`, and the default 
> value is one hour.
> But carefully analyze the code logic in DelegationTokenManager.expireToken(), 
> *now Kafka does not support users to delete an expired delegationToken that 
> he no longer uses/renew. If the user wants to do this, they will receive a 
> DelegationTokenExpiredException.*
> In the worst case, an expired delegationToken may still can be used normally 
> within {*}an hour{*}, even if this configuration 
> (delegation.token.expiry.check.interval.ms) broker can shorten the 
> configuration as much as possible.
> The solution is very simple, simply adjust the `if` order of 
> DelegationTokenManager.expireToken().
> {code:java}
> if (!allowedToRenew(principal, tokenInfo)) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
> } else if (expireLifeTimeMs < 0) { //expire immediately
>   removeToken(tokenInfo.tokenId)
>   info(s"Token expired for token: ${tokenInfo.tokenId} for owner: 
> ${tokenInfo.owner}")
>   expireResponseCallback(Errors.NONE, now)
> } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
> } else {
>   //set expiry time stamp
>  ..
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449191428

   yeah the DefaultStateUpdaterTest has been failing from time to time... not 
sure why 😭 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12639) AbstractCoordinator ignores backoff timeout when joining the consumer group

2023-02-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12639.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> AbstractCoordinator ignores backoff timeout when joining the consumer group
> ---
>
> Key: KAFKA-12639
> URL: https://issues.apache.org/jira/browse/KAFKA-12639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.7.0
>Reporter: Matiss Gutmanis
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> We observed heavy logging while trying to join consumer group during partial 
> unavailability of Kafka cluster (it's part of our testing process). Seems 
> that {{rebalanceConfig.retryBackoffMs}} used in  {{ 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeeded}}
>  is not respected. Debugging revealed that {{Timer}} instance technically is 
> expired thus using sleep of 0 milliseconds which defeats the purpose of 
> backoff timeout.
> Minimal backoff timeout should be respected.
>  
> {code:java}
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449186137

   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449185984

   The test failures are not relevant (but some of them are related to 
DefaultStateUpdaterTest.. sigh).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


guozhangwang merged PR #13190:
URL: https://github.com/apache/kafka/pull/13190


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449185108

   Hmm. I think these tests are flaky actually
   ```
   Build / JDK 17 and Scala 2.13 / 
shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   30s
   Build / JDK 17 and Scala 2.13 / 
shouldPauseActiveTaskAndTransitToUpdateStandby() – 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   30s
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 11 and Scala 2.13 / 
testListenerConnectionRateLimitWhenActualRateAboveLimit() – 
kafka.network.ConnectionQuotasTest
   19s
   Build / JDK 11 and Scala 2.13 / 
shouldRemovePausedAndUpdatingTasksOnShutdown() – 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   30s
   Build / JDK 11 and Scala 2.13 / 
shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
   31s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-02-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694801#comment-17694801
 ] 

Guozhang Wang commented on KAFKA-14533:
---

I took some time into this, also cannot find any clues leading state-updater to 
list-offset request failures on changelog topics.. nevertheless, I re-enabled 
the state-updater flag with finer-grained logging in assignor in 
https://github.com/apache/kafka/pull/13318, and hopefully jenkins could give me 
some more clues (like [~ableegoldman], I cannot reproduce this issue locally 
either with about 15 tries).

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang opened a new pull request, #13318: [DO NOT MERGE] Re-enable state-updater in SmokeTestDriverIntegrationTest

2023-02-28 Thread via GitHub


guozhangwang opened a new pull request, #13318:
URL: https://github.com/apache/kafka/pull/13318

   1. Print fine-grained streams exception when list-offset fails.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-28 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1121005648


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -1962,7 +1962,9 @@ class KafkaApisTest {
 ArgumentMatchers.eq(producerId),
 ArgumentMatchers.eq(epoch),
 ArgumentMatchers.eq(Set(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
+ArgumentMatchers.eq(false),

Review Comment:
   I added tests in RequestServerTest but I guess those are more integration 
tests. I can add some in KafkaApisTest too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-28 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1120991153


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !a

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-28 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1120990845


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !a

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-28 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1120986902


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !a

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-28 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1120976648


##
clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json:
##
@@ -22,22 +22,37 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds support to batch multiple transactions and a top level 
error code.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
 { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
   "about": "Duration in milliseconds for which the request was throttled 
due to a quota violation, or zero if the request did not violate any quota." },
-{ "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", 
"versions": "0+",
-  "about": "The results for each topic.", "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "4+",

Review Comment:
   Sounds fine to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-28 Thread via GitHub


cmccabe commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1449130059

   Thanks for this, @pprovenzano . It all looks good, but there is one change 
that we really do need here: we need to add SCRAM to MetadataVersion. I suspect 
we can add it to `IBP_3_5_IV0`. This also means having 
`MetadataVersion#isScramSupported`, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-28 Thread via GitHub


cmccabe commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1449130005

   We need a test like 
`./metadata/src/test/java/org/apache/kafka/image/ScramImageTesst.java` , 
similar to the tests for the other Image classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-28 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1120967466


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1328,7 +1328,7 @@ Priority priority() {
 @Override
 public void handleResponse(AbstractResponse response) {
 AddPartitionsToTxnResponse addPartitionsToTxnResponse = 
(AddPartitionsToTxnResponse) response;
-Map errors = 
addPartitionsToTxnResponse.errors();
+Map errors = 
addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);

Review Comment:
   What should we do if the check fails? Just have a better error message 
thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-28 Thread via GitHub


cmccabe commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1120967385


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.image.writer.ImageWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.clients.admin.ScramMechanism;
+
+// import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+// import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the SCRAM credentials in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramImage {
+public static final ScramImage EMPTY = new 
ScramImage(Collections.emptyMap());
+
+private final Map> 
mechanisms;
+
+public ScramImage(Map> 
mechanisms) {
+this.mechanisms = Collections.unmodifiableMap(mechanisms);
+}
+
+public void write(ImageWriter writer, ImageWriterOptions options) {

Review Comment:
   You need to check the MetadataVersion and invoke `options.handleLoss` if it 
is too low. See my comment below. This feature will only be supported in 
`IBP_3_5_IV0` and newer.
   
   Also we'll need a test for this (that we throw an exception when required if 
SCRAM is present). That should be in 
`./metadata/src/test/java/org/apache/kafka/image/ScramImageTesst.java` (which 
still needs to be written) :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-28 Thread via GitHub


cmccabe commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1120962516


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -221,6 +223,21 @@ class BrokerMetadataPublisher(
   s"quotas in ${deltaName}", t)
   }
 
+  // Apply changes to SCRAM credentials.
+  Option(delta.scramDelta()).foreach { scramDelta =>

Review Comment:
   I think we should split out this code for applying the SCRAM stuff into a 
separate `MetadataPublisher`, as we did with `DynamicConfigPublisher`. Then the 
controller can run it as well.
   
   But we can do that in a follow-on change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-28 Thread via GitHub


cmccabe commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1120962516


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -221,6 +223,21 @@ class BrokerMetadataPublisher(
   s"quotas in ${deltaName}", t)
   }
 
+  // Apply changes to SCRAM credentials.
+  Option(delta.scramDelta()).foreach { scramDelta =>

Review Comment:
   I think we should split out this code for applying the SCRAM stuff into a 
separate `MetadataPublisher`, as we did with `DynamicConfigPublisher`. Then the 
controller can run it as well.
   
   But we can do that in a follow-on change. I will file a JIRA



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-28 Thread via GitHub


cmccabe commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1120961672


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -99,6 +99,7 @@ class ControllerApis(val requestChannel: RequestChannel,
 case ApiKeys.INCREMENTAL_ALTER_CONFIGS => 
handleIncrementalAlterConfigs(request)
 case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => 
handleAlterPartitionReassignments(request)
 case ApiKeys.LIST_PARTITION_REASSIGNMENTS => 
handleListPartitionReassignments(request)
+case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
handleAlterUserScramCredentials(request)

Review Comment:
   Yeah. Just to restate what I think we all know here, clients ask brokers to 
describe SCRAM credentials, rather than controllers. So this doesn't 
technically need to be implemented, except for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc

2023-02-28 Thread via GitHub


gharris1727 commented on PR #13315:
URL: https://github.com/apache/kafka/pull/13315#issuecomment-1449121574

   @ijuma I don't know how my change would impact the execution order of rat vs 
processTestMessages, do you have any intuition?
   
   The commitId change that I originally proposed (without the deduplication of 
the Grgit call) didn't experience this failure; do you think we could merge the 
commitId improvement and follow up on the rat implicit dependencies in another 
PR if the latest commit doesn't fix the build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13288: MINOR: fix rerun-tests for unit test

2023-02-28 Thread via GitHub


chia7712 commented on PR #13288:
URL: https://github.com/apache/kafka/pull/13288#issuecomment-1449114559

   > The referenced https://github.com/apache/kafka/pull/11926 seems not 
relevant, is there a typo in the PR number?
   
   #11926 added the new property to avoid recompiling the tests. see 
https://github.com/apache/kafka/commit/322a065b9055649c713baf43f154052d45cd1588#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R41
   
   For another, it seems to me the existent task `cleanTest` can avoid 
recompilation also ( related to my previous comment 
https://github.com/apache/kafka/pull/11926#discussion_r1114653978), and thus 
there are two ways which can fix the command about "test without 
recompilation". 1) add `rerun-tests` for unit test. 2) replaces `rerun-tests` 
by `cleanTest`
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc

2023-02-28 Thread via GitHub


ijuma commented on PR #13315:
URL: https://github.com/apache/kafka/pull/13315#issuecomment-1449090030

   Looks like the build failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-28 Thread via GitHub


ijuma commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1120933708


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {
+
+public final FetchDataInfo fetchedData;
+public final Optional divergingEpoch;
+public final long highWatermark;
+public final long logStartOffset;
+public final long logEndOffset;
+public final long lastStableOffset;
+
+public LogReadInfo(FetchDataInfo fetchedData,
+   Optional 
divergingEpoch,
+   long highWatermark,
+   long logStartOffset,
+   long logEndOffset,
+   long lastStableOffset) {
+this.fetchedData = fetchedData;
+this.divergingEpoch = divergingEpoch;
+this.highWatermark = highWatermark;
+this.logStartOffset = logStartOffset;
+this.logEndOffset = logEndOffset;
+this.lastStableOffset = lastStableOffset;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+LogReadInfo that = (LogReadInfo) o;
+
+return highWatermark == that.highWatermark &&
+logStartOffset == that.logStartOffset &&
+logEndOffset == that.logEndOffset &&
+lastStableOffset == that.lastStableOffset &&
+Objects.equals(fetchedData, that.fetchedData) &&
+Objects.equals(divergingEpoch, that.divergingEpoch);
+}
+
+@Override
+public int hashCode() {
+int result = fetchedData != null ? fetchedData.hashCode() : 0;
+result = 31 * result + (divergingEpoch != null ? 
divergingEpoch.hashCode() : 0);
+result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32));
+result = 31 * result + (int) (logStartOffset ^ (logStartOffset >>> 
32));
+result = 31 * result + (int) (logEndOffset ^ (logEndOffset >>> 32));
+result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 
32));

Review Comment:
   We should use `Long.hashCode` in many of these lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-28 Thread via GitHub


ijuma commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r112095


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {

Review Comment:
   Note that `case` is used for many reasons. Do we actually need 
`equals`/`hashCode` (eg are these classes used as map keys or compared against 
each other)? If not, then it's maintenance overhead without benefit.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {

Review Comment:
   Note that `case class` is used for many reasons. Do we actually need 
`equals`/`hashCode` (eg are these classes used as map keys or compared against 
each other)? If not, then it's maintenance overhead without benefit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13288: MINOR: fix rerun-tests for unit test

2023-02-28 Thread via GitHub


guozhangwang commented on PR #13288:
URL: https://github.com/apache/kafka/pull/13288#issuecomment-1449082224

   The referenced https://github.com/apache/kafka/pull/11926 seems not 
relevant, is there a typo in the PR number?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-28 Thread via GitHub


junrao commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1120915339


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {
+
+public final FetchDataInfo fetchedData;
+public final Optional divergingEpoch;
+public final long highWatermark;
+public final long logStartOffset;
+public final long logEndOffset;
+public final long lastStableOffset;
+
+public LogReadInfo(FetchDataInfo fetchedData,

Review Comment:
   Should we define `equals()` and `hashCode()` for FetchDataInfo too?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * Container class which represents a snapshot of the significant offsets for 
a partition. This allows fetching
+ * of these offsets atomically without the possibility of a leader change 
affecting their consistency relative
+ * to each other. See {@link UnifiedLog#fetchOffsetSnapshot()}.
+ */
+public class LogOffsetSnapshot {

Review Comment:
   Should we define equals() and hashCode()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-28 Thread via GitHub


OneCricketeer commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1120908532


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   As one example, using the JDBC sink connector (or S3, or any that write to 
external location). These in themselves are fine, but let's say someone knows 
the environment variable provider is installed on the worker, so they add a 
series of InsertField transforms to attempt to pull out as many well-known 
environment variable names as possible (such as `AWS_SECRET_ACCESS_KEY`, as 
mentioned). This isn't stopped by a protected REST API. 
   
   I'm saying without some configuration property for the provider to define an 
allowlist pattern, possibly set up the configure method, then that would expose 
credentials meant to only be viewed by administrators of the workers. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120897608


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   > I would be really interested in knowing which case can't be covered here. 
If you can dig up the Converter counterexample, would you mind sharing it?
   
   Here's a snippet from the IsolatedConverter, which is a few PRs in the 
future from now.
   These methods take 3 or 4 arguments, and there's no builtin java functional 
type that covers them (I guess they would be TriFunction and QuadFunction?)
   ```
   public byte[] fromConnectData(String topic, Headers headers, Schema 
schema, Object value) throws Exception {
   return isolate(() -> delegate.fromConnectData(topic, headers, 
schema, value));
   }
   
   public SchemaAndValue toConnectData(String topic, Headers headers, 
byte[] value) throws Exception {
   return isolate(() -> delegate.toConnectData(topic, headers, value));
   }
   
   public byte[] fromConnectData(String topic, Schema schema, Object value) 
throws Exception {
   return isolate(() -> delegate.fromConnectData(topic, schema, value));
   }
   
   public SchemaAndValue toConnectData(String topic, byte[] value) throws 
Exception {
   return isolate(delegate::toConnectData, topic, value);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120897608


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   > I would be really interested in knowing which case can't be covered here. 
If you can dig up the Converter counterexample, would you mind sharing it?
   
   Here's a snippet from the IsolatedConverter, which is a few PRs in the 
future from now.
   These methods take 3 or 4 arguments, and there's no builtin java functional 
type that covers them (I guess they would be TriFunction and QuadFunction?)
   ```
   public byte[] fromConnectData(String topic, Headers headers, Schema 
schema, Object value) throws Exception {
   return isolate(() -> delegate.fromConnectData(topic, headers, 
schema, value));
   }
   
   public SchemaAndValue toConnectData(String topic, Headers headers, 
byte[] value) throws Exception {
   return isolate(() -> delegate.toConnectData(topic, headers, value));
   }
   
   public byte[] fromConnectData(String topic, Schema schema, Object value) 
throws Exception {
   return isolate(() -> delegate.fromConnectData(topic, schema, value));
   }
   
   public SchemaAndValue toConnectData(String topic, byte[] value) throws 
Exception {
   return isolate(delegate::toConnectData, topic, value);
   }
   ```
   
   The 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira

[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120889645


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   If it comes down to not being able to completely satisfy all use cases with 
just the `isolate(Callable)` (and possibly `isolateV(ThrowingRunnable)` 
methods, then I think it could be alright to add more variants. But the 
additional stack frames, and even ugly class names, don't seem likely to be 
super important to users or plugin developers. In both cases (though probably 
more often the latter), aren't they more likely to look for the first stack 
frame that originates with their plugin class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120887378


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   I would be really interested in knowing which case can't be covered here. If 
you can dig up the `Converter` counterexample, would you mind sharing it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120886025


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}
+
+public interface ThrowingRunnable {
+void run() throws Exception;
+}
+
+@Override
+public int hashCode() {
+try {
+return isolate(delegate::hashCode);
+} catch (Throwable e) {
+throw new RuntimeException("unable to evaluate plugin hashCode", 
e);
+}
+}
+
+@Override
+public boolean equals(Object obj) {
+if (obj == null || this.getClass() != obj.getClass()) {
+return false;
+}

Review Comment:
   Given that equality across `Connector` instances is likely not a developer 
priority, I think you're right regarding reference quality for delegates. Good 
point 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120880473


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}
+
+public interface ThrowingRunnable {
+void run() throws Exception;
+}
+
+@Override
+public int hashCode() {
+try {
+return isolate(delegate::hashCode);
+} catch (Throwable e) {
+throw new RuntimeException("unable to evaluate plugin hashCode", 
e);
+}
+}
+
+@Override
+public boolean equals(Object obj) {
+if (obj == null || this.getClass() != obj.getClass()) {
+return false;
+}

Review Comment:
   I think you're right. I could imagine a particularly silly 
`Connector::equals` method which always returns `true` and causes poor behavior 
in a hash map or other data structure which relies on the equals method.
   
   I also wonder if it is a good idea to call the delegate equals method, or 
whether we should use reference equality (this.delegate == other.delegate) and 
skip calling the equals method entirely.
   I don't believe that we're using these methods anywhere in the runtime, and 
I don't think that it's common for developers to override the equals/hashCode, 
so we get to choose what the most useful implementation of these methods is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please

[GitHub] [kafka] ijuma commented on a diff in pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc

2023-02-28 Thread via GitHub


ijuma commented on code in PR #13315:
URL: https://github.com/apache/kafka/pull/13315#discussion_r1120878251


##
build.gradle:
##
@@ -159,16 +159,9 @@ def determineCommitId() {
   def takeFromHash = 16
   if (project.hasProperty('commitId')) {
 commitId.take(takeFromHash)
-  } else if (file("$rootDir/.git/HEAD").exists()) {
-def headRef = file("$rootDir/.git/HEAD").text
-if (headRef.contains('ref: ')) {
-  headRef = headRef.replaceAll('ref: ', '').trim()
-  if (file("$rootDir/.git/$headRef").exists()) {
-file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
-  }
-} else {
-  headRef.trim().take(takeFromHash)
-}
+  } else if (file("$rootDir/.git").exists()) {
+def repo = Grgit.open(currentDir: project.getRootDir())

Review Comment:
   The `rat` configuration does something similar, maybe we should have a 
variable for the git repo that can be used by both tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #13176: MINOR: some ZK migration code cleanups.

2023-02-28 Thread via GitHub


cmccabe closed pull request #13176: MINOR: some ZK migration code cleanups.
URL: https://github.com/apache/kafka/pull/13176


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120867064


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   To take it to the extreme, we would inline and eliminate each of the 
`isolate` calls entirely and put the try-with-resources swap in the methods 
themselves. That would eliminate all lambdas and only add one stack frame, but 
be super repetitive in the high 10s of methods that have to be isolated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120864255


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   I think this is a question of readability, but it's important to consider 
the readability for operators of these connectors. With the current design that 
uses mostly method references, exceptions have the following stacktrace:
   ```
   org.apache.kafka.connect.errors.ConnectException:
at 
org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.lambda$testSinkConnector$0(IsolatedPluginTest.java:43)
   ...
at 
org.apache.kafka.connect.connector.Connector.initialize(Connector.java:57)
at 
org.apache.kafka.connect.runtime.isolation.IsolatedPlugin.isolateV(IsolatedPlugin.java:66)
at 
org.apache.kafka.connect.runtime.isolation.IsolatedConnector.initialize(IsolatedConnector.java:39)
at 
org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.testSinkConnector(IsolatedPluginTest.java:46)
...
   ```
   
   There's two stack frames associated with the isolation infrastructure that 
did not exist befor, and neither of them are anonymous lambdas (the 
`IsolatedPluginTest.lambda$testSinkConnector$0` is an anonymous lambda in my 
test, not part of the IsolatedPlugin).
   
   Compare that to the style of making everything into a callable:
   ```
   org.apache.kafka.connect.errors.ConnectException:
at 
org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.lambda$testSinkConnector$0(IsolatedPluginTest.java:43)
   .

[GitHub] [kafka] C0urante commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1120776026


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -419,7 +423,14 @@ private  Collection> 
getServiceLoaderPluginDesc(Class klass,
 Collection> result = new ArrayList<>();
 try {
 ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (T pluginImpl : serviceLoader) {
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Unable to instantiate plugin{}", 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   Would it be more informative to use the type of plugin class being 
instantiated instead of just "plugin"?
   ```suggestion
   log.error("Unable to instantiate {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##
@@ -111,20 +115,62 @@ public enum TestPlugin {
 /**
  * A plugin which shares a jar file with {@link 
TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE}
  */
-MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo");
+MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo"),
+/**
+ * A plugin which is incorrectly packaged, and is missing a superclass 
definition.
+ */
+FAIL_TO_INITIALIZE_MISSING_SUPERCLASS("fail-to-initialize", 
"test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
+/**
+ * A plugin which is packaged with other incorrectly packaged plugins, 
but itself has no issues loading.
+ */
+FAIL_TO_INITIALIZE_CO_LOCATED("fail-to-initialize", 
"test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER),
+/**
+ * A connector which is incorrectly packaged, and throws during static 
initialization.
+ */
+
FAIL_TO_INITIALIZE_STATIC_INITIALIZER_THROWS_CONNECTOR("fail-to-initialize", 
"test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER),
+/**
+ * A plugin which is incorrectly packaged, which throws an exception 
from the {@link Versioned#version()} method.
+ */
+
FAIL_TO_INITIALIZE_VERSION_METHOD_THROWS_CONNECTOR("fail-to-initialize", 
"test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER),

Review Comment:
   Nit: d'you think `FAIL_TO_INITIALIZE` is a bit inaccurate here, since we do 
manage to load and use the plugin despite its version method throwing an 
exception?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -439,6 +457,22 @@ public static  String versionFor(Class 
pluginKlass) throws Refle
 versionFor(pluginKlass.getDeclaredConstructor().newInstance()) : 
UNDEFINED_VERSION;
 }
 
+private static String reflectiveErrorDescription(Throwable t) {
+if (t instanceof NoSuchMethodException) {
+return ": Plugin class must have a default constructor, and cannot 
be a non-static inner class";
+} else if (t instanceof SecurityException) {
+return ": Security settings must allow reflective instantiation of 
plugin classes";
+} else if (t instanceof IllegalAccessException) {
+return ": Plugin class default constructor must be public";
+} else if (t instanceof ExceptionInInitializerError) {
+return ": Plugin class should not throw exception during static 
initialization";
+} else if (t instanceof InvocationTargetException) {
+return ": Constructor must complete without throwing an exception";
+} else {
+return "";

Review Comment:
   I like this method for the most part; it helps provide a nice summary of 
what's wrong with the plugin class and how to fix things.
   
   One thought I had is that it may be better in some if not all cases to 
describe the problem rather than prescribe a solution. For example, 
"Constructor must complete without throwing an exception" isn't really 
groundbreaking information; it's probably fine to just say "Failed to invoke 
constructor" and let users figure out the rest from the remainder of the stack 
trace.
   
   I think this applies to the branches for `ExceptionInInitializerError` and 
`InvocationTargetException` classes, and the others could be left as-are, but 
feel free to tweak the wording if they could be improved as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitH

[jira] [Updated] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version

2023-02-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14769:
-
Affects Version/s: (was: 3.4.0)

> NPE in ControllerMetricsManager when upgrading from old KRaft version
> -
>
> Key: KAFKA-14769
> URL: https://issues.apache.org/jira/browse/KAFKA-14769
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Jose Armando Garcia Sancio
>Priority: Critical
>
> In older KRaft versions, we could see a ConfigRecord for a topic config 
> appear before the TopicRecord in a batch.
> When upgrading from an older KRaft version (e.g., 3.1), the latest code in 
> the KRaft controller hits an NPE when it encounters a ConfigRecord for a 
> topic config before the TopicRecord. This was introduced relatively recently 
> by KAFKA-14457



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version

2023-02-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14769:
-
Fix Version/s: (was: 3.5.0)
   (was: 3.4.1)

> NPE in ControllerMetricsManager when upgrading from old KRaft version
> -
>
> Key: KAFKA-14769
> URL: https://issues.apache.org/jira/browse/KAFKA-14769
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: Jose Armando Garcia Sancio
>Priority: Critical
>
> In older KRaft versions, we could see a ConfigRecord for a topic config 
> appear before the TopicRecord in a batch.
> When upgrading from an older KRaft version (e.g., 3.1), the latest code in 
> the KRaft controller hits an NPE when it encounters a ConfigRecord for a 
> topic config before the TopicRecord. This was introduced relatively recently 
> by KAFKA-14457



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version

2023-02-28 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14769.
--
Resolution: Invalid

> NPE in ControllerMetricsManager when upgrading from old KRaft version
> -
>
> Key: KAFKA-14769
> URL: https://issues.apache.org/jira/browse/KAFKA-14769
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: Jose Armando Garcia Sancio
>Priority: Critical
> Fix For: 3.5.0, 3.4.1
>
>
> In older KRaft versions, we could see a ConfigRecord for a topic config 
> appear before the TopicRecord in a batch.
> When upgrading from an older KRaft version (e.g., 3.1), the latest code in 
> the KRaft controller hits an NPE when it encounters a ConfigRecord for a 
> topic config before the TopicRecord. This was introduced relatively recently 
> by KAFKA-14457



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version

2023-02-28 Thread David Arthur (Jira)
David Arthur created KAFKA-14769:


 Summary: NPE in ControllerMetricsManager when upgrading from old 
KRaft version
 Key: KAFKA-14769
 URL: https://issues.apache.org/jira/browse/KAFKA-14769
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.4.0
Reporter: David Arthur
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.5.0, 3.4.1


In older KRaft versions, we could see a ConfigRecord for a topic config appear 
before the TopicRecord in a batch.

When upgrading from an older KRaft version (e.g., 3.1), the latest code in the 
KRaft controller hits an NPE when it encounters a ConfigRecord for a topic 
config before the TopicRecord. This was introduced relatively recently by 
KAFKA-14457



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120685793


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}

Review Comment:
   We can reduce duplication here by piggybacking off of the non-void `isolate` 
method:
   ```suggestion
   isolate(() -> {
   runnable.run();
   return null;
   });
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -419,8 +420,14 @@ public ConnectorMetricsGroup(ConnectMetrics 
connectMetrics, AbstractStatus.State
 metricGroup.close();
 
 metricGroup.addImmutableValueMetric(registry.connectorType, 
connectorType());
-metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.getClass().getName());
-metricGroup.addImmutableValueMetric(registry.connectorVersion, 
connector.version());
+metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.pluginClass().getName());
+String version;
+try {
+version = connector.version();
+} catch (Exception e) {
+version = DelegatingClassLoader.UNDEFINED_VERSION;
+}

Review Comment:
   This technically changes behavior, right? We go from failing the connector 
on startup to just assigning it an undefined version.
   
   I think this is probably fine (the benefits outweigh the costs, and I have a 
hard time imagining any use case that would favor failing here), just want to 
make sure I'm gauging the impact here correctly.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eith

[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-28 Thread via GitHub


C0urante commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1120750168


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   Sorry, by "deploy code", do you mean a custom connector class? Because if 
that's the case, then credentials leakage is not going to be prevented by 
modifying this config provider since the connector can just invoke 
`System::getEnv` and do whatever it wants with that info.
   
   If we're discussing connector configurations instead, then it's already 
possible, and basically necessary, to lock down your Connect worker's REST API 
to prevent users from submitting arbitrary connector configurations to your 
worker in anything but local testing environments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-28 Thread via GitHub


OneCricketeer commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1114750313


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -41,6 +46,14 @@ public EnvVarConfigProvider(Map 
envVarsAsArgument) {
 
 @Override
 public void configure(Map configs) {
+if (configs.keySet().contains(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) 
{

Review Comment:
   `containsKey` would be more clear 



##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   > controlled on an opt-in basis by the cluster administrator via worker 
config files
   
   Right, so say, I, as an administrator, enable an environment variable 
provider, and have secrets defined on the workers, that I assume can only be 
read by authorized admins. Then allow anyone to deploy code that can then read 
said variables without any additional controls. That causes credential leakage, 
regardless of other capabilities for arbitrary code execution 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120665269


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -501,13 +501,18 @@ boolean joinGroupIfNeeded(final Timer timer) {
 }
 
 if (exception instanceof UnknownMemberIdException ||
-exception instanceof IllegalGenerationException ||
-exception instanceof RebalanceInProgressException ||
-exception instanceof MemberIdRequiredException)
+exception instanceof IllegalGenerationException ||
+exception instanceof RebalanceInProgressException ||
+exception instanceof MemberIdRequiredException)
 continue;
 else if (!future.isRetriable())

Review Comment:
   the previous logic was reverted with some autocorrection to the indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120663942


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1484,7 +1484,8 @@ public void testRebalanceWithMetadataChange() {
 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
 client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-coordinator.poll(time.timer(0));
+assertFalse(client.hasInFlightRequests());
+coordinator.poll(time.timer(1));

Review Comment:
   note: we need to add a timeout here to give the retry a second chance, 
because in the new code, the timer is checked and causes the method to exit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lbownik commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-02-28 Thread via GitHub


lbownik commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1120661170


##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(String name, Map tags) {
+String pkg;
+if (klass.getPackage() == null) {
+pkg = "";
+} else {
+pkg = klass.getPackage().getName();
+}
+String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+return explicitMetricName(pkg, simpleName, name, tags);
+}
+
+public static MetricName explicitMetricName(String group, String typeName,
+String name, Map tags) {
+StringBuilder nameBuilder = new StringBuilder();

Review Comment:
   maybe initialize the builder with some initial calacity (default is 16 - a 
little low).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13287:
URL: https://github.com/apache/kafka/pull/13287#discussion_r1120636637


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -427,9 +427,8 @@ private void updateConnectorTasks(String connName) {
 }
 
 List> newTaskConfigs = 
recomputeTaskConfigs(connName);
-List> oldTaskConfigs = 
configState.allTaskConfigs(connName);
 
-if (!newTaskConfigs.equals(oldTaskConfigs)) {

Review Comment:
   I think the most convincing argument here is point 1; any duplication of 
work that isn't inevitable can be addressed to the same degree regardless of 
which class contains the logic, and I don't think the risk of improving the 
naive search is super high.
   
   Anyways, sounds good to me 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14767) Gradle build fails with missing commitId after git gc

2023-02-28 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694207#comment-17694207
 ] 

Greg Harris edited comment on KAFKA-14767 at 2/28/23 7:01 PM:
--

This happens occasionally when git performs an automatic `gc`, and often causes 
build failures when I merge `trunk` into my development branches. My build 
usually won't complete until I finish the merge commit, which I may have to go 
back and amend if i introduced any build breakages.

I know of one workaround: Take the branch you're working on and move it to 
re-create the non-gc'd refs file:
{noformat}
branch="$(git branch --show-current)"
git checkout -b placeholder
git branch -f $branch trunk
git branch -f $branch placeholder
git checkout $branch
git branch -D placeholder{noformat}


was (Author: gharris1727):
This happens occasionally when git performs an automatic `gc`, and often causes 
build failures when I merge `trunk` into my development branches. My build 
usually won't complete until I finish the merge commit, which I may have to go 
back and amend if i introduced any build breakages.

I know of one workaround: Take the branch you're working on and move it to 
re-create the non-gc'd refs file:
{noformat}
brach="$(git branch --show-current)"
git checkout -b placeholder
git branch -f $branch trunk
git branch -f $branch placeholder
git checkout $branch
git branch -D placeholder{noformat}

> Gradle build fails with missing commitId after git gc
> -
>
> Key: KAFKA-14767
> URL: https://issues.apache.org/jira/browse/KAFKA-14767
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> Reproduction steps:
> 1. `git gc`
> 2. `./gradlew jar`
> Expected behavior: build completes successfully (or shows other build errors)
> Actual behavior:
> {noformat}
> Task failed with an exception.
> ---
> * What went wrong:
> A problem was found with the configuration of task 
> ':storage:createVersionFile' (type 'DefaultTask').
>   - Property 'commitId' doesn't have a configured value.
>     
>     Reason: This property isn't marked as optional and no value has been 
> configured.
>     
>     Possible solutions:
>       1. Assign a value to 'commitId'.
>       2. Mark property 'commitId' as optional.
>     
>     Please refer to 
> https://docs.gradle.org/7.6/userguide/validation_problems.html#value_not_set 
> for more details about this problem.{noformat}
> This appears to be due to the fact that the build.gradle determineCommitId() 
> function is unable to read the git commit hash for the current HEAD. This 
> appears to happen after a `git gc` takes place, which causes the 
> `.git/refs/heads/*` files to be moved to `.git/packed-refs`.
> The determineCommitId() should be patched to also try reading from the 
> packed-refs to determine the commit hash.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14767) Gradle build fails with missing commitId after git gc

2023-02-28 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694207#comment-17694207
 ] 

Greg Harris edited comment on KAFKA-14767 at 2/28/23 7:01 PM:
--

This happens occasionally when git performs an automatic `gc`, and often causes 
build failures when I merge `trunk` into my development branches. My build 
usually won't complete until I finish the merge commit, which I may have to go 
back and amend if i introduced any build breakages.

I know of one workaround: Take the branch you're working on and move it to 
re-create the non-gc'd refs file:
{noformat}
brach="$(git branch --show-current)"
git checkout -b placeholder
git branch -f $branch trunk
git branch -f $branch placeholder
git checkout $branch
git branch -D placeholder{noformat}


was (Author: gharris1727):
This happens occasionally when git performs an automatic `gc`, and often causes 
build failures when I merge `trunk` into my development branches. My build 
usually won't complete until I finish the merge commit, which I may have to go 
back and amend if i introduced any build breakages.

I know of one workaround: Take the branch you're working on and move it to 
re-create the non-gc'd refs file:
{noformat}
brach=
git checkout -b placeholder
git branch -f $branch trunk
git branch -f $branch placeholder
git checkout $branch
git branch -D placeholder{noformat}

> Gradle build fails with missing commitId after git gc
> -
>
> Key: KAFKA-14767
> URL: https://issues.apache.org/jira/browse/KAFKA-14767
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> Reproduction steps:
> 1. `git gc`
> 2. `./gradlew jar`
> Expected behavior: build completes successfully (or shows other build errors)
> Actual behavior:
> {noformat}
> Task failed with an exception.
> ---
> * What went wrong:
> A problem was found with the configuration of task 
> ':storage:createVersionFile' (type 'DefaultTask').
>   - Property 'commitId' doesn't have a configured value.
>     
>     Reason: This property isn't marked as optional and no value has been 
> configured.
>     
>     Possible solutions:
>       1. Assign a value to 'commitId'.
>       2. Mark property 'commitId' as optional.
>     
>     Please refer to 
> https://docs.gradle.org/7.6/userguide/validation_problems.html#value_not_set 
> for more details about this problem.{noformat}
> This appears to be due to the fact that the build.gradle determineCommitId() 
> function is unable to read the git commit hash for the current HEAD. This 
> appears to happen after a `git gc` takes place, which causes the 
> `.git/refs/heads/*` files to be moved to `.git/packed-refs`.
> The determineCommitId() should be patched to also try reading from the 
> packed-refs to determine the commit hash.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.

2023-02-28 Thread via GitHub


gharris1727 commented on code in PR #13287:
URL: https://github.com/apache/kafka/pull/13287#discussion_r1120610035


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -427,9 +427,8 @@ private void updateConnectorTasks(String connName) {
 }
 
 List> newTaskConfigs = 
recomputeTaskConfigs(connName);
-List> oldTaskConfigs = 
configState.allTaskConfigs(connName);
 
-if (!newTaskConfigs.equals(oldTaskConfigs)) {

Review Comment:
   This logic is simpler, but only because `ClusterConfigState::allTaskConfigs` 
and `List::equals` are abstracting some of the complexity.
   
   There are multiple reasons I decided to keep the DistributedHerder 
implementation instead of the Standalone herder:
   
   1. The DistributedHerder implementation allows us to provide more detailed 
debug logs about how the task configs are different, in a way that 
`List::equals` hides. Since distributed mode is more common, I figured adding 
those logs to standalone was better than taking them away from distributed mode.
   2. The `taskConfig` and `allTaskConfigs` are duplicating one another to some 
degree (they're both retrieving task configs, they're both applying 
transformations) but we cannot remove `taskConfig` as it is used in lots of 
other areas of the herder.
   3. The `allTaskConfigs` is currently performing a naive search over all 
known task configs, and while we could fix that, it requires a larger change 
with higher regression risk.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-28 Thread via GitHub


C0urante commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1120563796


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.common.config.provider.EnvVarConfigProviderConfig.ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+private Pattern envVarPattern;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+if (configs.keySet().contains(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) 
{
+envVarPattern = Pattern.compile(
+
String.valueOf(configs.get(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG))
+);
+} else {
+envVarPattern = Pattern.compile(".*");
+log.info("No pattern for environment variables provided. Using 
default pattern '(.*)'.");
+}
+}
+
+@Override
+public void close() throws IOException {
+}
+
+/**
+ * @param s unused
+ * @return returns environment variables as configuration
+ */
+@Override
+public ConfigData get(String s) {
+return get(s, null);
+}
+
+/**
+ * @param pathpath, not used for environment variables
+ * @param keys the keys whose values will be retrieved.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path, Set keys) {
+
+if (path != null && !path.isEmpty()) {
+log.error("Path is not supported for EnvVarConfigProvider, invalid 
value '{}'", path);
+throw new ConfigException("Path is not supported for 
EnvVarConfigProvider, invalid value '" + path + "'");
+}
+
+if (envVarMap == null) {
+return new ConfigData(new HashMap<>());
+}
+
+Map filteredEnvVarMap = envVarMap;
+
+filteredEnvVarMap = envVarMap.entrySet().stream()

Review Comment:
   Nit: can be simplified
   
   ```suggestion
   Map filteredEnvVarMap = envVarMap.entrySet().stream()
   ```



##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderConfig.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.AbstractConfig;

Review Comment:
   Unused import (causing Checkstyle failures during build)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For que

[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-28 Thread via GitHub


C0urante commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1120561014


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   IMO this isn't really necessary; in Connect, no config providers are enabled 
by default, and they are controlled on an opt-in basis by the cluster 
administrator via worker config files. You cannot cause a new config provider 
to be used in a connector config that is not already enabled in the worker 
config.
   
   Additionally, if we're discussing malicious connector classes (instead of 
malicious connector configs), arbitrary code execution is already possible, so 
this kind of feature wouldn't prevent people from reading environment variables.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-02-28 Thread via GitHub


ivanyu commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1120551247


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -497,51 +501,59 @@ object RequestMetrics {
   val ErrorsPerSec = "ErrorsPerSec"
 }
 
-class RequestMetrics(name: String) extends KafkaMetricsGroup {
+class RequestMetrics(name: String) {
 
   import RequestMetrics._
 
-  val tags = Map("request" -> name)
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+  val tags = Map("request" -> name).asJava
   val requestRateInternal = new Pool[Short, Meter]()
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, 
tags)
+  val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, 
true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
+  val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags)
   // time a request takes to wait on remote brokers (currently only relevant 
to fetch and produce requests)
-  val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
+  val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags)
   // time a request is throttled, not part of the request processing time 
(throttling is done at the client level
   // for clients that support KIP-219 and by muting the channel for the rest)
-  val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
+  val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, 
tags)
+  val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs, 
true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, 
tags)
-  val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
+  val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs, 
true, tags)
+  val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags)
   // request size in bytes
-  val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
+  val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags)
   // time for message conversions (only relevant to fetch and produce requests)
   val messageConversionsTimeHist =
 if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-  Some(newHistogram(MessageConversionsTimeMs, biased = true, tags))
+  Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags))
 else
   None
   // Temporary memory allocated for processing request (only populated for 
fetch and produce requests)
   // This shows the memory allocated for compression/conversions excluding the 
actual request size
   val tempMemoryBytesHist =
 if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-  Some(newHistogram(TemporaryMemoryBytes, biased = true, tags))
+  Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags))
 else
   None
 
   private val errorMeters = mutable.Map[Errors, ErrorMeter]()
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, 
error)))
 
-  def requestRate(version: Short): Meter = {
-requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, 
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+  def requestRate(version: Short): Meter =
+requestRateInternal.getAndMaybePut(version, 
metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, 
tagsWithVersion(version)))
+
+  def tagsWithVersion(version: Short): java.util.Map[String, String] = {
+val nameAndVersionTags = new util.HashMap[String, String]()

Review Comment:
   I didn't do this because considered this an infrequent operation, but sure, 
have done this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13286: MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder

2023-02-28 Thread via GitHub


C0urante merged PR #13286:
URL: https://github.com/apache/kafka/pull/13286


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13287:
URL: https://github.com/apache/kafka/pull/13287#discussion_r1120511592


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -427,9 +427,8 @@ private void updateConnectorTasks(String connName) {
 }
 
 List> newTaskConfigs = 
recomputeTaskConfigs(connName);
-List> oldTaskConfigs = 
configState.allTaskConfigs(connName);
 
-if (!newTaskConfigs.equals(oldTaskConfigs)) {

Review Comment:
   This logic seems significantly simpler than what's currently used in 
`DistributedHerder:: reconfigureConnector ` and moved to 
`AbstractHerder::taskConfigsChanged` in this PR.
   
   Why not keep this logic instead of removing 
`ConfigClusterState::allTaskConfigs`?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1981,6 +1966,8 @@ private void reconfigureConnector(final String connName, 
final Callback cb
 }
 });
 }
+} else {
+log.debug("Skipping reconfiguration of connector {} as 
generated configs appear unchanged", connName);

Review Comment:
   Why not move this into `AbstractHerder::taskConfigsChanged` as well, so that 
it's picked up in both standalone and distributed mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-28 Thread via GitHub


satishd commented on PR #13304:
URL: https://github.com/apache/kafka/pull/13304#issuecomment-1448587425

   Thanks @junrao for your review. Addressed them with the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-02-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14533:
---

Assignee: Guozhang Wang

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-02-28 Thread via GitHub


ijuma commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1120488775


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -497,51 +501,59 @@ object RequestMetrics {
   val ErrorsPerSec = "ErrorsPerSec"
 }
 
-class RequestMetrics(name: String) extends KafkaMetricsGroup {
+class RequestMetrics(name: String) {
 
   import RequestMetrics._
 
-  val tags = Map("request" -> name)
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+  val tags = Map("request" -> name).asJava
   val requestRateInternal = new Pool[Short, Meter]()
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, 
tags)
+  val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, 
true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
+  val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags)
   // time a request takes to wait on remote brokers (currently only relevant 
to fetch and produce requests)
-  val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
+  val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags)
   // time a request is throttled, not part of the request processing time 
(throttling is done at the client level
   // for clients that support KIP-219 and by muting the channel for the rest)
-  val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
+  val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, 
tags)
+  val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs, 
true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, 
tags)
-  val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
+  val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs, 
true, tags)
+  val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags)
   // request size in bytes
-  val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
+  val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags)
   // time for message conversions (only relevant to fetch and produce requests)
   val messageConversionsTimeHist =
 if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-  Some(newHistogram(MessageConversionsTimeMs, biased = true, tags))
+  Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags))
 else
   None
   // Temporary memory allocated for processing request (only populated for 
fetch and produce requests)
   // This shows the memory allocated for compression/conversions excluding the 
actual request size
   val tempMemoryBytesHist =
 if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-  Some(newHistogram(TemporaryMemoryBytes, biased = true, tags))
+  Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags))
 else
   None
 
   private val errorMeters = mutable.Map[Errors, ErrorMeter]()
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, 
error)))
 
-  def requestRate(version: Short): Meter = {
-requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, 
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+  def requestRate(version: Short): Meter =
+requestRateInternal.getAndMaybePut(version, 
metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, 
tagsWithVersion(version)))
+
+  def tagsWithVersion(version: Short): java.util.Map[String, String] = {
+val nameAndVersionTags = new util.HashMap[String, String]()

Review Comment:
   Maybe we can presize the map correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120481527


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// 4 special non-retriable exceptions that we want to retry, 
as long as the timer hasn't expired.
 if (exception instanceof UnknownMemberIdException ||
-exception instanceof IllegalGenerationException ||
-exception instanceof RebalanceInProgressException ||
-exception instanceof MemberIdRequiredException)
+exception instanceof IllegalGenerationException ||

Review Comment:
   Hey thanks for the comments again and absolutely no apology is needed there! 
I guess, as we all know, rebalancing is full of subtleties, so it makes sense 
to be careful about these non-retriable exception case. I think it's a good 
idea to keep the original behavior consistent, in case of unexpected breakage. 
Updating the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

2023-02-28 Thread via GitHub


C0urante commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1448579807

   > I think that fixing the test by extending the timeout might hide the 
problems that users may be facing in production as well.
   
   I don't think that bumping the timeouts in these tests is likely to hide 
issues that may surface in production; rather, I'm only worried that they'll 
make local testing and development more cumbersome.
   
   > Hence, I believe, we need a better mechanism to check for successful 
startup of MirrorMaker. Thoughts?
   
   Agreed 👍 IMO MM2 observability in general can be improved. Right now you're 
basically limited to JMX, other custom metrics reporters, and logs. There's no 
public API for viewing cluster-wide health info (the closest thing is reading 
directly from the status topic, but that's not public API and we shouldn't 
count on users doing that or recommend it).
   
   If there's an issue starting one of the connectors, instead of failing 
startup of the MM2 node, we [log a 
message](https://github.com/apache/kafka/blob/f586fa59d3f938e04bda4e8143ddb1c4310eaf78/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L232-L237)
 and move on. I can't imagine this is very easy for users to work with, though 
maybe I'm underestimating the utility of JMX and logs here.
   
   > In absence of such a mechanism, today, a user might call 
MirrorMaker#start() and assume that it has started correctly while in the 
background, connector start up might have failed (just as it did in this test 
scenario).
   
   This is less concerning because the `MirrorMaker` class isn't part of public 
API; users are not expected to directly interact with any instances of this 
class and we do not support any use cases like that right now.
   
   
   I'm wondering if, instead of a `Future`-based approach, we might add an 
internal API to the `MirrorMaker` class to expose the status of the connectors 
running on it?
   
   A brief sketch:
   
   ```java
   public class MirrorMaker {
   public ConnectorStateInfo connectorStatus(String source, String target, 
String connector) {
   SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
   checkHerder(sourceAndTarget);
   return herders.get(new SourceAndTarget(source, 
target)).connectorStatus(connector);
   }
   }
   ```
   
   We could then leverage this API in the `DedicatedMirrorIntegrationTest` 
suite to implement an equivalent of the `waitUntilMirrorMakerIsRunning` method 
from the `MirrorConnectorsIntegrationBaseTest` suite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-02-28 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1448576594

   I removed the checkstyle statements which allow `connect-runtime` to import 
`tools`. Since this is the only dependency on `tools`, we also have an 
opportunity to disallow _any_ importing of the tools package, similar to the 
kafka server.
   
   Is this a good idea? Does it make sense for the `tools` package to disallow 
everyone from importing it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120460193


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// 4 special non-retriable exceptions that we want to retry, 
as long as the timer hasn't expired.

Review Comment:
   I think here the comment is not to just state what the code did, since 
readers can just understand that from the code :P instead what we want to 
emphasize is to remind future contributors that they should be careful to not 
change the precedence ordering of this logic unnecessarily.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// 4 special non-retriable exceptions that we want to retry, 
as long as the timer hasn't expired.
 if (exception instanceof UnknownMemberIdException ||
-exception instanceof IllegalGenerationException ||
-exception instanceof RebalanceInProgressException ||
-exception instanceof MemberIdRequiredException)
+exception instanceof IllegalGenerationException ||

Review Comment:
   Ah thanks for the clarifications!
   
   Thinking about this a bit more (sorry for getting back and forth..), I now 
concerned a bit more that for some usage patterns where `poll` call would be 
triggered less frequently, we may not be coming back to handle these four 
exceptions while at the same time the broker is ticking and waiting for the 
join-group request to be re-sent. Hence I'm changing my mind to lean a bit more 
to honor the exception types for immediate handling than the timeouts --- 
again, sorry for going back and forth...
   
   So I think we would define the ordering as the following:
   
   1. For un-retriable exception, always try to handle immediately and not 
honor the timer.
   2. Otherwise, honor the timer.
   
   In that case, we could just go back to the first time you made the change, 
i.e. just add the 
   
   ```
   if (timer.isExpired())
   return false;
   ```
   
   After the `if/else-if` block. Still it's better to comment that above 
ordering is diligently designed as such.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


ijuma commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120470741


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws 
IOException {
  * @throws IOException  if {@link DataInput} throws {@link 
IOException}
  */
 public static long readVarlong(DataInput in) throws IOException {
-long value = 0L;
-int i = 0;
-long b;
-while (((b = in.readByte()) & 0x80) != 0) {
-value |= (b & 0x7f) << i;
-i += 7;
-if (i > 63)
-throw illegalVarlongException(value);
+long raw = readUnsignedVarlong(in);
+return (raw >>> 1) ^ -(raw & 1);
+}
+
+private static long readUnsignedVarlong(DataInput in) throws IOException {
+byte tmp = in.readByte();

Review Comment:
   That's interesting. Can we update the jmh benchmark to have variants where 
(1) the max varint fits within one byte and where (2) the max varint fits 
within two bytes. I think that is the most common by far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-28 Thread via GitHub


guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1120456284


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
 client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.poll(time.timer(0)); // failing joinGroup request will 
require re-poll in order to retry

Review Comment:
   Ack, that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement

2023-02-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694652#comment-17694652
 ] 

Guozhang Wang commented on KAFKA-14748:
---

I agree for Stream-Stream joins now, since for the case "no right hand side 
value found" we would not emit immediately so it is the case with 
"key-extractor returns null". But for table-table FK-joins, today the former 
case would emit while the latter case would not?

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils

2023-02-28 Thread via GitHub


satishd commented on PR #13309:
URL: https://github.com/apache/kafka/pull/13309#issuecomment-1448524298

   Thanks @junrao for the review. Addressed them with the latest 
commit/comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils

2023-02-28 Thread via GitHub


satishd commented on code in PR #13309:
URL: https://github.com/apache/kafka/pull/13309#discussion_r1120421628


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java:
##
@@ -72,4 +92,99 @@ private static String filenamePrefixFromOffset(long offset) {
 return nf.format(offset);
 }
 
+/**
+ * Construct a log file name in the given dir with the given base offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File logFile(File dir, long offset) {
+return logFile(dir, offset, "");
+}
+
+/**
+ * Construct a log file name in the given dir with the given base offset 
and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name (e.g. "", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File logFile(File dir, long offset, String suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
LOG_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Construct an index file name in the given dir using the given base 
offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File offsetIndexFile(File dir, long offset) {
+return offsetIndexFile(dir, offset, "");
+}
+
+/**
+ * Construct an index file name in the given dir using the given base 
offset and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name ("", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File offsetIndexFile(File dir, long offset, String suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
INDEX_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Construct a time index file name in the given dir using the given base 
offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File timeIndexFile(File dir, long offset) {
+return timeIndexFile(dir, offset, "");
+}
+
+/**
+ * Construct a time index file name in the given dir using the given base 
offset and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name ("", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File timeIndexFile(File dir, long offset, String suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
TIME_INDEX_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Construct a transaction index file name in the given dir using the 
given base offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File transactionIndexFile(File dir, long offset) {
+return transactionIndexFile(dir, offset, "");
+}
+
+/**
+ * Construct a transaction index file name in the given dir using the 
given base offset and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name ("", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File transactionIndexFile(File dir, long offset, String 
suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
TXN_INDEX_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Returns the offset from the given file. The file name is of the form: 
{number}.{suffix}. This method extracts
+ * the number from the given file's name.
+ *
+ * @param file file with the offset information as part of its name.
+ * @return offset of the given file
+ */
+public static Long offsetFromFile(File file) {

Review Comment:
   This is already removed from LocalLog. Are you asking whether to remove this 
method from here and use `offsetFromFileName` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120354148


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) {
  * @param buffer The output to write to
  */
 public static void writeUnsignedVarint(int value, ByteBuffer buffer) {

Review Comment:
   Netty uses the default loop based implementation: 
https://github.com/netty/netty/blob/5d1f99655918c9c034ca090d51b64eced73f742f/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java#L58
 
   
   I wasn't able to find a project which does what we are doing [except for the 
blog 
post](https://steinborn.me/posts/performance/how-fast-can-you-write-a-varint/) 
I mentioned above and over 
[here](https://github.com/richardstartin/varints/blob/main/src/main/java/io/github/richardstartin/varints/SmartNoDataDependencyVarIntState.java).
   
   Note that protobuf uses unrolled implementation for it's c++ code at 
   
https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.h#L913
 Not that it matters for us since compilers are different from Java & c++ but 
adding it here as a data point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-28 Thread via GitHub


C0urante merged PR #13184:
URL: https://github.com/apache/kafka/pull/13184


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120328865


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws 
IOException {
  * @throws IOException  if {@link DataInput} throws {@link 
IOException}
  */
 public static long readVarlong(DataInput in) throws IOException {
-long value = 0L;
-int i = 0;
-long b;
-while (((b = in.readByte()) & 0x80) != 0) {
-value |= (b & 0x7f) << i;
-i += 7;
-if (i > 63)
-throw illegalVarlongException(value);
+long raw = readUnsignedVarlong(in);
+return (raw >>> 1) ^ -(raw & 1);
+}
+
+private static long readUnsignedVarlong(DataInput in) throws IOException {
+byte tmp = in.readByte();

Review Comment:
   No, I have written this by extending Netty's varint32 implementation to work 
with 64 bits (it's simple loop unrolling, no fancy logic). The heuristics of 
inlining starts becoming unclearer as we increase the size of function, hence, 
we don't see much benefit for 64 bit implementation here. I don't have a strong 
opinion on 64 bit implementation here and would be happy to fall back to exact 
implementation as Protobuf. [1]
   
   
   [1] 
https://github.com/protocolbuffers/protobuf/blob/main/java/core/src/main/java/com/google/protobuf/CodedInputStream.java#L1048



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120323557


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int 
offset, int value) {
  * @throws IllegalArgumentException if variable-length value does not 
terminate after 5 bytes have been read
  */
 public static int readUnsignedVarint(ByteBuffer buffer) {

Review Comment:
   Sure, will do. 
   
   Also protobuf has a similar unrolled implementation for its c++ code at 
https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.cc#L422
 (it's Java variant doesn't use the unrolled implementation since they decided 
to add a different implementation which favours cases when varints are 1 byte).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-28 Thread via GitHub


yashmayya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119640881


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTaskReconfigurationRetries() {
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+// end of initial tick
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+member.wakeup();
+PowerMock.expectLastCall();
+
+// second tick
+member.ensureActive();
+PowerMock.expectLastCall();
+
+EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+.andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();

Review Comment:
   Thanks, I've updated the test to add another herder tick which runs a 
successful task reconfiguration request (I skipped the addition of another tick 
because the no further retries bit can be verified by the poll timeout at the 
end of the previous tick).
   
   Regarding the test case for the task reconfiguration REST request to the 
leader - I did consider that initially but while trying to add one, there were 
some complications (timing related issues) arising from the use of the 
`forwardRequestExecutor` at which point I felt like it was more trouble than it 
was worth. However, your comment made me revisit it and I've made some changes 
to drop in a simple mock executor service which runs requests synchronously (on 
the same thread as the caller). Let me know what you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14732) Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14732:
--
Fix Version/s: 3.5.0

> Use an exponential backoff retry mechanism while reconfiguring connector tasks
> --
>
> Key: KAFKA-14732
> URL: https://issues.apache.org/jira/browse/KAFKA-14732
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Kafka Connect in distributed mode retries infinitely with a fixed retry 
> backoff (250 ms) in case of errors arising during connector task 
> reconfiguration. Tasks can be "reconfigured" during connector startup (to get 
> the initial task configs from the connector), a connector resume or if a 
> connector explicitly requests it via its context. Task reconfiguration 
> essentially entails requesting a connector instance for its task configs and 
> writing them to the Connect cluster's config storage (in case a change in 
> task configs is detected). A fixed retry backoff of 250 ms leads to very 
> aggressive retries - consider a Debezium connector which attempts to initiate 
> a database connection in its [taskConfigs 
> method|https://github.com/debezium/debezium/blob/bf347da71ad9b0819998a3bc9754b3cc96cc1563/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L63].
>  If the connection fails due to something like an invalid login, the Connect 
> worker will essentially spam connection attempts frequently and indefinitely 
> (until the connector config / database side configs are fixed). An 
> exponential backoff retry mechanism seems more well suited for the 
> [DistributedHerder::reconfigureConnectorTasksWithRetry|https://github.com/apache/kafka/blob/a54a34a11c1c867ff62a7234334cad5139547fd7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1873-L1898]
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-28 Thread via GitHub


C0urante merged PR #13276:
URL: https://github.com/apache/kafka/pull/13276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1120259483


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTaskReconfigurationRetries() {
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+// end of initial tick
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+member.wakeup();
+PowerMock.expectLastCall();
+
+// second tick
+member.ensureActive();
+PowerMock.expectLastCall();
+
+EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+.andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();

Review Comment:
   Ah, fair point about the fourth tick!
   
   I don't love using a synchronous executor here since it diverges 
significantly from the non-testing behavior of the herder. But, I can't think 
of a better way to test this without going overboard in complexity, and it does 
give us decent coverage.
   
   So, good enough 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13316: MINOR: srcJar should depend on processMessages task

2023-02-28 Thread via GitHub


ijuma merged PR #13316:
URL: https://github.com/apache/kafka/pull/13316


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13316: MINOR: srcJar should depend on processMessages task

2023-02-28 Thread via GitHub


ijuma commented on PR #13316:
URL: https://github.com/apache/kafka/pull/13316#issuecomment-1448363891

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


ijuma commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120217103


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) {
  * @param buffer The output to write to
  */
 public static void writeUnsignedVarint(int value, ByteBuffer buffer) {

Review Comment:
   What does netty do for this? Similar to what we're doing here or something 
else?



##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int 
offset, int value) {
  * @throws IllegalArgumentException if variable-length value does not 
terminate after 5 bytes have been read
  */
 public static int readUnsignedVarint(ByteBuffer buffer) {

Review Comment:
   Shall we have a comment indicating that we borrowed the implementation from 
Netty with a link to the relevant file?



##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws 
IOException {
  * @throws IOException  if {@link DataInput} throws {@link 
IOException}
  */
 public static long readVarlong(DataInput in) throws IOException {
-long value = 0L;
-int i = 0;
-long b;
-while (((b = in.readByte()) & 0x80) != 0) {
-value |= (b & 0x7f) << i;
-i += 7;
-if (i > 63)
-throw illegalVarlongException(value);
+long raw = readUnsignedVarlong(in);
+return (raw >>> 1) ^ -(raw & 1);
+}
+
+private static long readUnsignedVarlong(DataInput in) throws IOException {
+byte tmp = in.readByte();

Review Comment:
   I saw a link to the Netty implementation for varint32, is there one for 
varint64 too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2023-02-28 Thread via GitHub


divijvaidya commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1448325538

   Completing the conversation here, in case someone comes around reading this 
old thread. We have a new KIP and a PR for Zk migration to 3.8.1 which would 
hopefully land in 3.5.
   
   KIP - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-902%3A+Upgrade+Zookeeper+to+3.8.1
 
   
   PR - https://github.com/apache/kafka/pull/13260 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14659:
--
Fix Version/s: 3.3.3

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14659:
--
Fix Version/s: 3.4.1

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.5.0, 3.4.1
>
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >