[jira] [Updated] (KAFKA-14304) ZooKeeper to KRaft Migration
[ https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Akhilesh Chaganti updated KAFKA-14304: -- Fix Version/s: 3.5.0 > ZooKeeper to KRaft Migration > > > Key: KAFKA-14304 > URL: https://issues.apache.org/jira/browse/KAFKA-14304 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.5.0, 3.4.1 > > > Top-level JIRA for > [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1121236675 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); + +public static final String DIR_NAME = "remote-log-index-cache"; + +private static final String TMP_FILE_SUFFIX = ".tmp"; + +private final File cacheDir; +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); +private final Object lock = new Object(); +private final RemoteStorageManager remoteStorageManager; +private final Map entries; +private final ShutdownableThread cleanerThread; + +private volatile boolean closed = false; + +public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this(1024, remoteStorageManager, logDir); +} + +/** + * Creates RemoteIndexCache with the given configs. + * + * @param maxSize maximum number of segment index entries to be cached. + * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. + * @param logDir log directory + */ +public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this.remoteStorageManager = remoteStorageManager; +cacheDir = new File(logDir, DIR_NAME); + +entries = new LinkedHashMap(maxSize, 0.75f, true) { +@Override +protected boolean removeEldestEntry(Map.Entry eldest) { +if (this.size() > maxSize) { +RemoteIndexCache.Entry entry = eldest.getValue(); +// Mark the entries for cleanup, background thread will clean them later. +try { +entry.markForCleanup(); +} catch (IOException e) { +throw new KafkaException(e); +} +expiredIndexes.add(entry); +return true; +} else { +return false; +} +} +}; + +init(); + +// Start cleaner thread that will clean the expired entries. +cleanerThread = createCleanerThread(); +cleanerThread.start(); +} + +pr
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1121168324 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java: ## @@ -31,6 +32,21 @@ public final class LogFileUtils { */ public static final String DELETED_FILE_SUFFIX = ".deleted"; +/** + * Suffix of an offset index file + */ +public static final String INDEX_FILE_SUFFIX = ".index"; Review Comment: Right, that is the plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils
satishd merged PR #13309: URL: https://github.com/apache/kafka/pull/13309 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils
satishd commented on PR #13309: URL: https://github.com/apache/kafka/pull/13309#issuecomment-1449372613 Test failures do not seem to be related to this change, merging these changes to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14768) proposal to reduce the first message's send time cost and max block time for safety
[ https://issues.apache.org/jira/browse/KAFKA-14768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] fujian updated KAFKA-14768: --- Description: Hi, Team: Nice to meet you! In our business, we found two types of issue which need to improve: *(1) Take much time to send the first message* Sometimes, we found the users' functional interaction take a lot of time. At last, we figure out the root cause is that after we complete deploy or restart the servers. The first message's delivery on each application server by kafka client will take much time. So, we try to find one solution to improve it. After analyzing the source code about the first time's sending logic. The time cost is caused by the getting metadata before the sending. The latter's sending won't take the much time due to the cached metadata. The logic is right and necessary. Thus, we still want to improve the experience for the first message's send/user first interaction. *(2) can't reduce the send message's block time to wanted value.* Sometimes our application's thread will block for max.block.ms to send message. When we try to reduce the max.block.ms to reduce the blocking time. It can't meet the getting metadata's time requirement sometimes. The root cause is the configured max.block.ms is shared with "get metadata" operation and "send message" operation. We can refer to follow tables: |*where to block* |*when it is blocked* |*how long it will be blocked?* | |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first request which need to load the metadata from kafka|https://github.com/apache/kafka/pull/13320]) _note: org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata_ ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) __ after the change, we can call it before the service is marked as ready. After the ready. it won't block to get metadata due to cache. And then we can be safe to reduce the max.block.ms to a lower value to reduce thread's blocking time. After adopting the solution 3. we solve the above issues. For example, we reduce the first message's send about 4s seconds. The log can refer to followed: _warmup test_topic at phase phase 2: get metadata from mq start_ _warmup test_topic at phase phase 2: get metadata from mq end consume *4669ms*_ And after the change, we reduce the max.block.ms from 10s to 2s without worry can't get metadata. {*}So what's your thought for these two issues and the solution I proposed{*}. I hope to get your feedback and thought for the issues. was: Hi, Team: Nice to meet you! In our business, we found two types of issue which need to improve: *(1) Take much time to send the first message* Sometimes, we found the users' functional interaction take a lot of time. At last, we figure out the root cause is that after we complete deploy or restart the servers. The first message's delivery on each application server by kafka client will take much time. So, we try to find one solution to improve it. After analyzing the source code about the first time's sending logic. The time cost is caused by the getting metadata before the sending. The latter's sending won't take the much time due to the cached metadata. The logic is right and necessary. Thus, we still want to improve the experience for the first message's send/user first interaction. *(2) can't reduce the send message's block time to wanted value.* Sometimes our application's thread will block for max.block.ms to send message. When we try to reduce the max.block.ms to reduce the blocking time. It can't meet the getting metadata's time requirement sometimes. The root cause is the configured max.block.ms is shared with "get metadata" operation and "send message" operation. We can refer to follow tables: |*where to block* |*when it is blocked* |*how long it will be blocked?* | |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first request which need to load the metadata from kafka| proposal to reduce the first message's send time cost and max block time for > safety > > > Key: KAFKA-14768 > URL: https://issues.apache.org/jira/browse/KAFKA-14768 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.3.1, 3.3.2 >Reporter: fujian >Assignee: hzh0425 >Priority: Major > Labels: performance > > Hi, Team: > > Nice to meet you! > > In our business, we found two types of issue which need to improve: > > *(1) Take much time to send the first message* > Sometimes, we found the users' functional interaction take a lot of time. At > last, we figure out the root cause is that after we complete depl
[GitHub] [kafka] jiafu1115 opened a new pull request, #13320: KAFKA-14768: provide new method to warmup first record's sending and reduce the max.block.ms safely
jiafu1115 opened a new pull request, #13320: URL: https://github.com/apache/kafka/pull/13320 Hi Team: Refer to https://issues.apache.org/jira/browse/KAFKA-14768 There are two issues for current's producer: (1) Take much time to send the first record; (2) can't reduce the send message's block time (max.block.ms) to wanted lower value caused by worry about the only one time's metadata fetch in doSend's process. Test Result: At the beginning of the change, we will take 4s+ to send one message and must make sure max.block.ms > 4s. After the change: (1) reduce time for first record's sending: my example: warmup test_topic at phase phase 2: get metadata from mq start warmup test_topic at phase phase 2: get metadata from mq end consume 4669ms (2) can reduce max.block.ms to < 4s now. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
showuon commented on PR #13319: URL: https://github.com/apache/kafka/pull/13319#issuecomment-1449265831 Thanks for investigating it! This test did fail quite frequently recently. But I'm not completely understanding this: > The root cause seems to be with changing the return behavior of when in mockito. Fixed those without using reset What does the "return bahavior change" in mockito? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
showuon commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1121043658 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java: ## @@ -31,6 +32,21 @@ public final class LogFileUtils { */ public static final String DELETED_FILE_SUFFIX = ".deleted"; +/** + * Suffix of an offset index file + */ +public static final String INDEX_FILE_SUFFIX = ".index"; Review Comment: Looks like it'll get conflicted with another PR. But I think you'll resolve them later. ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); + +public static final String DIR_NAME = "remote-log-index-cache"; + +private static final String TMP_FILE_SUFFIX = ".tmp"; + +private final File cacheDir; +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); +private final Object lock = new Object(); +private final RemoteStorageManager remoteStorageManager; +private final Map entries; +private final ShutdownableThread cleanerThread; + +private volatile boolean closed = false; + +public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this(1024, remoteStorageManager, logDir); +} + +/** + * Creates RemoteIndexCache with the given configs. + * + * @param maxSize maximum number of segment index entries to be cached. + * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. + * @param logDir log directory + */ +public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this.remoteStorageManager = remoteStorageManager; +cacheDir = new File(logDir, DIR_NAME); + +entries = new LinkedHashMap(maxSize, 0.75f, true) { +@Override +protected boolean removeEldestEntry(Map.Entry eldest) { +if (this.size() > maxSize) { +RemoteIndexCache.Entry entry = eldest.getValue(); +// Mark the entries for cleanup, background thread will clean them later. +try { +entry.markForCleanup(); +} catch (IOExcep
[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
guozhangwang commented on PR #13319: URL: https://github.com/apache/kafka/pull/13319#issuecomment-1449254753 ping @cadonna @lucasbru for a quick review. Before this PR, local runs fail about 50% of the time; after this PR it succeeds with about 10 runs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
guozhangwang opened a new pull request, #13319: URL: https://github.com/apache/kafka/pull/13319 Found a few flaky tests while reviewing another PR. The root cause seems to be with changing the return behavior of `when` in mockito. Fixed those without using `reset` and also bumped a couple debug log lines to info since they could be very helpful in debugging. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14371) quorum-state file contains empty/unused clusterId field
[ https://issues.apache.org/jira/browse/KAFKA-14371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14371. --- Fix Version/s: 3.5.0 Resolution: Fixed > quorum-state file contains empty/unused clusterId field > --- > > Key: KAFKA-14371 > URL: https://issues.apache.org/jira/browse/KAFKA-14371 > Project: Kafka > Issue Type: Improvement >Reporter: Ron Dagostino >Assignee: Gantigmaa Selenge >Priority: Minor > Fix For: 3.5.0 > > > The KRaft controller's quorum-state file > `$LOG_DIR/__cluster_metadata-0/quorum-state` contains an empty clusterId > value. This value is never non-empty, and it is never used after it is > written and then subsequently read. This is a cosmetic issue; it would be > best if this value did not exist there. The cluster ID already exists in the > `$LOG_DIR/meta.properties` file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file
showuon merged PR #13102: URL: https://github.com/apache/kafka/pull/13102 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file
showuon commented on PR #13102: URL: https://github.com/apache/kafka/pull/13102#issuecomment-1449214463 Failed tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testSingleNodeCluster() Build / JDK 11 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithUniformSubscription() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testSingleNodeCluster() Build / JDK 17 and Scala 2.13 / kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart() Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testMigrate, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations() Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseActiveTaskAndTransitToUpdateStandby() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster() Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.testResponseIncludesLeaderEpoch() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseActiveTaskAndTransitToUpdateStandby() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseActiveTaskAndTransitToUpdateStandby() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14768) proposal to reduce the first message's send time cost and max block time for safety
[ https://issues.apache.org/jira/browse/KAFKA-14768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] fujian updated KAFKA-14768: --- Description: Hi, Team: Nice to meet you! In our business, we found two types of issue which need to improve: *(1) Take much time to send the first message* Sometimes, we found the users' functional interaction take a lot of time. At last, we figure out the root cause is that after we complete deploy or restart the servers. The first message's delivery on each application server by kafka client will take much time. So, we try to find one solution to improve it. After analyzing the source code about the first time's sending logic. The time cost is caused by the getting metadata before the sending. The latter's sending won't take the much time due to the cached metadata. The logic is right and necessary. Thus, we still want to improve the experience for the first message's send/user first interaction. *(2) can't reduce the send message's block time to wanted value.* Sometimes our application's thread will block for max.block.ms to send message. When we try to reduce the max.block.ms to reduce the blocking time. It can't meet the getting metadata's time requirement sometimes. The root cause is the configured max.block.ms is shared with "get metadata" operation and "send message" operation. We can refer to follow tables: |*where to block* |*when it is blocked* |*how long it will be blocked?* | |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first request which need to load the metadata from kafka| proposal to reduce the first message's send time cost and max block time for > safety > > > Key: KAFKA-14768 > URL: https://issues.apache.org/jira/browse/KAFKA-14768 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.3.1, 3.3.2 >Reporter: fujian >Assignee: hzh0425 >Priority: Major > Labels: performance > > Hi, Team: > > Nice to meet you! > > In our business, we found two types of issue which need to improve: > > *(1) Take much time to send the first message* > Sometimes, we found the users' functional interaction take a lot of time. At > last, we figure out the root cause is that after we complete deploy or > restart the servers. The first message's delivery on each application server > by kafka client will take much time. > So, we try to find one solution to improve it. > > After analyzing the source code about the first time's sending logic. The > time cost is caused by the getting metadata before the sending. The latter's > sending won't take the much time due to the cached metadata. The logic is > right and necessary. Thus, we still want to improve the experience for the > first message's send/user first interaction. > > *(2) can't reduce the send message's block time to wanted value.* > Sometimes our application's thread will block for max.block.ms to send > message. When we try to reduce the max.block.ms to reduce the blocking time. > It can't meet the getting metadata's time requirement sometimes. The root > cause is the configured max.block.ms is shared with "get metadata" operation > and "send message" operation. We can refer to follow tables: > |*where to block* > |*when it is blocked* > |*how long it will be blocked?* > | > |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first > request which need to load the metadata from kafka| |org.apache.kafka.clients.producer.internals.RecordAccumulator#append|at peak > time for business, if the network can’t send message in short > time.| > What's the solution for the above two issues: > I think about current logic and figure out followed possible solution: > (1) send one "warmup" message, thus we can't send any fake message. > (2) provide one extra configure time configure which dedicated for getting > metadata. thus it will break the define for the max.block.ms > (3) add one method to call waitOnMetadata with one timeout setting without > using the max.block.ms > > _note: org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata_ > ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long > nowMs, long maxWaitMs) > > __ > after the change, we can call it before the service is marked as ready. After > the ready. it won't block to get metadata due to cache. And then we can be > safe to reduce the max.block.ms to a lower value to reduce thread's blocking > time. > > After adopting the solution 3. we solve the above issues. For example, we > reduce the first message's send about 4s seconds. The log can refer to > followed: > _warmup test_topic at phase phase 2: get metadata from mq start_ > _warmup test_topic
[jira] [Updated] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
[ https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13771: -- Fix Version/s: 3.5.0 > Support to explicitly delete delegationTokens that have expired but have not > been automatically cleaned up > -- > > Key: KAFKA-13771 > URL: https://issues.apache.org/jira/browse/KAFKA-13771 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > Fix For: 3.5.0 > > > Quoting the official documentation > {quote} > Tokens can also be cancelled explicitly. If a token is not renewed by the > token’s expiration time or if token is beyond the max life time, it will be > deleted from all broker caches as well as from zookeeper. > {quote} > 1. The first point above means that after the `AdminClient` initiates the > EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() > method on the KafkaServer side, if the user passes in expireLifeTimeMs less > than 0, KafaServer will delete the corresponding delegationToken directly. > 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, > which is responsible for regularly cleaning up expired tokens. The execution > interval is `delegation.token.expiry.check.interval.ms`, and the default > value is one hour. > But carefully analyze the code logic in DelegationTokenManager.expireToken(), > *now Kafka does not support users to delete an expired delegationToken that > he no longer uses/renew. If the user wants to do this, they will receive a > DelegationTokenExpiredException.* > In the worst case, an expired delegationToken may still can be used normally > within {*}an hour{*}, even if this configuration > (delegation.token.expiry.check.interval.ms) broker can shorten the > configuration as much as possible. > The solution is very simple, simply adjust the `if` order of > DelegationTokenManager.expireToken(). > {code:java} > if (!allowedToRenew(principal, tokenInfo)) { > expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1) > } else if (expireLifeTimeMs < 0) { //expire immediately > removeToken(tokenInfo.tokenId) > info(s"Token expired for token: ${tokenInfo.tokenId} for owner: > ${tokenInfo.owner}") > expireResponseCallback(Errors.NONE, now) > } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) { > expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1) > } else { > //set expiry time stamp > .. > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449191428 yeah the DefaultStateUpdaterTest has been failing from time to time... not sure why 😭 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12639) AbstractCoordinator ignores backoff timeout when joining the consumer group
[ https://issues.apache.org/jira/browse/KAFKA-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12639. --- Fix Version/s: 3.5.0 Resolution: Fixed > AbstractCoordinator ignores backoff timeout when joining the consumer group > --- > > Key: KAFKA-12639 > URL: https://issues.apache.org/jira/browse/KAFKA-12639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.7.0 >Reporter: Matiss Gutmanis >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0 > > > We observed heavy logging while trying to join consumer group during partial > unavailability of Kafka cluster (it's part of our testing process). Seems > that {{rebalanceConfig.retryBackoffMs}} used in {{ > org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeeded}} > is not respected. Debugging revealed that {{Timer}} instance technically is > expired thus using sleep of 0 milliseconds which defeats the purpose of > backoff timeout. > Minimal backoff timeout should be respected. > > {code:java} > 2021-03-30 08:30:24,488 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,488 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,488 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,489 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,489 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,489 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,490 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,490 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,490 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,491 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,491 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,491 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,492 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,492 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,492 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449186137 Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449185984 The test failures are not relevant (but some of them are related to DefaultStateUpdaterTest.. sigh). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang merged PR #13190: URL: https://github.com/apache/kafka/pull/13190 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on PR #13190: URL: https://github.com/apache/kafka/pull/13190#issuecomment-1449185108 Hmm. I think these tests are flaky actually ``` Build / JDK 17 and Scala 2.13 / shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest 30s Build / JDK 17 and Scala 2.13 / shouldPauseActiveTaskAndTransitToUpdateStandby() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest 30s Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 11 and Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest 19s Build / JDK 11 and Scala 2.13 / shouldRemovePausedAndUpdatingTasksOnShutdown() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest 30s Build / JDK 11 and Scala 2.13 / shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest 31s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694801#comment-17694801 ] Guozhang Wang commented on KAFKA-14533: --- I took some time into this, also cannot find any clues leading state-updater to list-offset request failures on changelog topics.. nevertheless, I re-enabled the state-updater flag with finer-grained logging in assignor in https://github.com/apache/kafka/pull/13318, and hopefully jenkins could give me some more clues (like [~ableegoldman], I cannot reproduce this issue locally either with about 15 tries). > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-14533 > URL: https://issues.apache.org/jira/browse/KAFKA-14533 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Greg Harris >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test > > The SmokeTestDriverIntegrationTest appears to be flakey failing in recent > runs: > ``` > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > ``` > The stacktrace appears to be: > ``` > java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed > out after 600 seconds > at > org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > ... > Suppressed: java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > ... 134 more > ``` > The test appears to be timing out waiting for the SmokeTestClient to complete > its asynchronous close, and taking significantly longer to do so (600s > instead of 60s) than a typical local test execution time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang opened a new pull request, #13318: [DO NOT MERGE] Re-enable state-updater in SmokeTestDriverIntegrationTest
guozhangwang opened a new pull request, #13318: URL: https://github.com/apache/kafka/pull/13318 1. Print fine-grained streams exception when list-offset fails. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1121005648 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -1962,7 +1962,9 @@ class KafkaApisTest { ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(epoch), ArgumentMatchers.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))), +ArgumentMatchers.eq(false), Review Comment: I added tests in RequestServerTest but I guess those are more integration tests. I can add some in KafkaApisTest too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1120991153 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) -val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] -val transactionalId = addPartitionsToTxnRequest.data.transactionalId -val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala -if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) -else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, -partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { -if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED -else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION -else - authorizedPartitions.add(topicPartition) +val addPartitionsToTxnRequest = + if (request.context.apiVersion() < 4) +request.body[AddPartitionsToTxnRequest].normalizeRequest() + else +request.body[AddPartitionsToTxnRequest] +val version = addPartitionsToTxnRequest.version +val responses = new AddPartitionsToTxnResultCollection() +val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction() + +// Newer versions of the request should only come from other brokers. +if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + +// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the +// response so there are a few differences in handling errors and sending responses. +def createResponse(requestThrottleMs: Int): AbstractResponse = { + if (version < 4) { +// There will only be one response in data. Add it to the response data object. +val data = new AddPartitionsToTxnResponseData() +responses.forEach(result => { + data.setResultsByTopicV3AndBelow(result.topicResults()) + data.setThrottleTimeMs(requestThrottleMs) +}) +new AddPartitionsToTxnResponse(data) + } else { +new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses)) } +} - if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { -// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the -// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded -// the authorization check to indicate that they were not added to the transaction. -val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) +val txns = addPartitionsToTxnRequest.data.transactions +def maybeSendResponse(): Unit = { + var canSend = false + responses.synchronized { +if (responses.size() == txns.size()) { + canSend = true +} + } + if (canSend) { +requestHelper.sendResponseMaybeThrottle(request, createResponse) + } +} + +txns.forEach( transaction => { + val transactionalId = transaction.transactionalId + val partitionsToAdd = partitionsByTransaction.get(transactionalId).asScala + + // Versions < 4 come from clients and must be authorized to write for the given transaction and for the given topics. + if (version < 4 && !a
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1120990845 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) -val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] -val transactionalId = addPartitionsToTxnRequest.data.transactionalId -val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala -if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) -else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, -partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { -if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED -else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION -else - authorizedPartitions.add(topicPartition) +val addPartitionsToTxnRequest = + if (request.context.apiVersion() < 4) +request.body[AddPartitionsToTxnRequest].normalizeRequest() + else +request.body[AddPartitionsToTxnRequest] +val version = addPartitionsToTxnRequest.version +val responses = new AddPartitionsToTxnResultCollection() +val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction() + +// Newer versions of the request should only come from other brokers. +if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + +// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the +// response so there are a few differences in handling errors and sending responses. +def createResponse(requestThrottleMs: Int): AbstractResponse = { + if (version < 4) { +// There will only be one response in data. Add it to the response data object. +val data = new AddPartitionsToTxnResponseData() +responses.forEach(result => { + data.setResultsByTopicV3AndBelow(result.topicResults()) + data.setThrottleTimeMs(requestThrottleMs) +}) +new AddPartitionsToTxnResponse(data) + } else { +new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses)) } +} - if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { -// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the -// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded -// the authorization check to indicate that they were not added to the transaction. -val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) +val txns = addPartitionsToTxnRequest.data.transactions +def maybeSendResponse(): Unit = { + var canSend = false + responses.synchronized { +if (responses.size() == txns.size()) { + canSend = true +} + } + if (canSend) { +requestHelper.sendResponseMaybeThrottle(request, createResponse) + } +} + +txns.forEach( transaction => { + val transactionalId = transaction.transactionalId + val partitionsToAdd = partitionsByTransaction.get(transactionalId).asScala + + // Versions < 4 come from clients and must be authorized to write for the given transaction and for the given topics. + if (version < 4 && !a
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1120986902 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) -val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] -val transactionalId = addPartitionsToTxnRequest.data.transactionalId -val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala -if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) -else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, -partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { -if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED -else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION -else - authorizedPartitions.add(topicPartition) +val addPartitionsToTxnRequest = + if (request.context.apiVersion() < 4) +request.body[AddPartitionsToTxnRequest].normalizeRequest() + else +request.body[AddPartitionsToTxnRequest] +val version = addPartitionsToTxnRequest.version +val responses = new AddPartitionsToTxnResultCollection() +val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction() + +// Newer versions of the request should only come from other brokers. +if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + +// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the +// response so there are a few differences in handling errors and sending responses. +def createResponse(requestThrottleMs: Int): AbstractResponse = { + if (version < 4) { +// There will only be one response in data. Add it to the response data object. +val data = new AddPartitionsToTxnResponseData() +responses.forEach(result => { + data.setResultsByTopicV3AndBelow(result.topicResults()) + data.setThrottleTimeMs(requestThrottleMs) +}) +new AddPartitionsToTxnResponse(data) + } else { +new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses)) } +} - if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { -// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the -// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded -// the authorization check to indicate that they were not added to the transaction. -val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) +val txns = addPartitionsToTxnRequest.data.transactions +def maybeSendResponse(): Unit = { + var canSend = false + responses.synchronized { +if (responses.size() == txns.size()) { + canSend = true +} + } + if (canSend) { +requestHelper.sendResponseMaybeThrottle(request, createResponse) + } +} + +txns.forEach( transaction => { + val transactionalId = transaction.transactionalId + val partitionsToAdd = partitionsByTransaction.get(transactionalId).asScala + + // Versions < 4 come from clients and must be authorized to write for the given transaction and for the given topics. + if (version < 4 && !a
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1120976648 ## clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json: ## @@ -22,22 +22,37 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds support to batch multiple transactions and a top level error code. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, -{ "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", "versions": "0+", - "about": "The results for each topic.", "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "4+", Review Comment: Sounds fine to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1449130059 Thanks for this, @pprovenzano . It all looks good, but there is one change that we really do need here: we need to add SCRAM to MetadataVersion. I suspect we can add it to `IBP_3_5_IV0`. This also means having `MetadataVersion#isScramSupported`, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1449130005 We need a test like `./metadata/src/test/java/org/apache/kafka/image/ScramImageTesst.java` , similar to the tests for the other Image classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1120967466 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1328,7 +1328,7 @@ Priority priority() { @Override public void handleResponse(AbstractResponse response) { AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response; -Map errors = addPartitionsToTxnResponse.errors(); +Map errors = addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID); Review Comment: What should we do if the check fails? Just have a better error message thrown? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1120967385 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.clients.admin.ScramMechanism; + +// import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +// import java.util.function.Consumer; +import java.util.stream.Collectors; + + +/** + * Represents the SCRAM credentials in the metadata image. + * + * This class is thread-safe. + */ +public final class ScramImage { +public static final ScramImage EMPTY = new ScramImage(Collections.emptyMap()); + +private final Map> mechanisms; + +public ScramImage(Map> mechanisms) { +this.mechanisms = Collections.unmodifiableMap(mechanisms); +} + +public void write(ImageWriter writer, ImageWriterOptions options) { Review Comment: You need to check the MetadataVersion and invoke `options.handleLoss` if it is too low. See my comment below. This feature will only be supported in `IBP_3_5_IV0` and newer. Also we'll need a test for this (that we throw an exception when required if SCRAM is present). That should be in `./metadata/src/test/java/org/apache/kafka/image/ScramImageTesst.java` (which still needs to be written) :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1120962516 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -221,6 +223,21 @@ class BrokerMetadataPublisher( s"quotas in ${deltaName}", t) } + // Apply changes to SCRAM credentials. + Option(delta.scramDelta()).foreach { scramDelta => Review Comment: I think we should split out this code for applying the SCRAM stuff into a separate `MetadataPublisher`, as we did with `DynamicConfigPublisher`. Then the controller can run it as well. But we can do that in a follow-on change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1120962516 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -221,6 +223,21 @@ class BrokerMetadataPublisher( s"quotas in ${deltaName}", t) } + // Apply changes to SCRAM credentials. + Option(delta.scramDelta()).foreach { scramDelta => Review Comment: I think we should split out this code for applying the SCRAM stuff into a separate `MetadataPublisher`, as we did with `DynamicConfigPublisher`. Then the controller can run it as well. But we can do that in a follow-on change. I will file a JIRA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1120961672 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -99,6 +99,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request) +case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request) Review Comment: Yeah. Just to restate what I think we all know here, clients ask brokers to describe SCRAM credentials, rather than controllers. So this doesn't technically need to be implemented, except for testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc
gharris1727 commented on PR #13315: URL: https://github.com/apache/kafka/pull/13315#issuecomment-1449121574 @ijuma I don't know how my change would impact the execution order of rat vs processTestMessages, do you have any intuition? The commitId change that I originally proposed (without the deduplication of the Grgit call) didn't experience this failure; do you think we could merge the commitId improvement and follow up on the rat implicit dependencies in another PR if the latest commit doesn't fix the build? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13288: MINOR: fix rerun-tests for unit test
chia7712 commented on PR #13288: URL: https://github.com/apache/kafka/pull/13288#issuecomment-1449114559 > The referenced https://github.com/apache/kafka/pull/11926 seems not relevant, is there a typo in the PR number? #11926 added the new property to avoid recompiling the tests. see https://github.com/apache/kafka/commit/322a065b9055649c713baf43f154052d45cd1588#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5R41 For another, it seems to me the existent task `cleanTest` can avoid recompilation also ( related to my previous comment https://github.com/apache/kafka/pull/11926#discussion_r1114653978), and thus there are two ways which can fix the command about "test without recompilation". 1) add `rerun-tests` for unit test. 2) replaces `rerun-tests` by `cleanTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc
ijuma commented on PR #13315: URL: https://github.com/apache/kafka/pull/13315#issuecomment-1449090030 Looks like the build failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
ijuma commented on code in PR #13304: URL: https://github.com/apache/kafka/pull/13304#discussion_r1120933708 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Objects; +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { + +public final FetchDataInfo fetchedData; +public final Optional divergingEpoch; +public final long highWatermark; +public final long logStartOffset; +public final long logEndOffset; +public final long lastStableOffset; + +public LogReadInfo(FetchDataInfo fetchedData, + Optional divergingEpoch, + long highWatermark, + long logStartOffset, + long logEndOffset, + long lastStableOffset) { +this.fetchedData = fetchedData; +this.divergingEpoch = divergingEpoch; +this.highWatermark = highWatermark; +this.logStartOffset = logStartOffset; +this.logEndOffset = logEndOffset; +this.lastStableOffset = lastStableOffset; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +LogReadInfo that = (LogReadInfo) o; + +return highWatermark == that.highWatermark && +logStartOffset == that.logStartOffset && +logEndOffset == that.logEndOffset && +lastStableOffset == that.lastStableOffset && +Objects.equals(fetchedData, that.fetchedData) && +Objects.equals(divergingEpoch, that.divergingEpoch); +} + +@Override +public int hashCode() { +int result = fetchedData != null ? fetchedData.hashCode() : 0; +result = 31 * result + (divergingEpoch != null ? divergingEpoch.hashCode() : 0); +result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32)); +result = 31 * result + (int) (logStartOffset ^ (logStartOffset >>> 32)); +result = 31 * result + (int) (logEndOffset ^ (logEndOffset >>> 32)); +result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32)); Review Comment: We should use `Long.hashCode` in many of these lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
ijuma commented on code in PR #13304: URL: https://github.com/apache/kafka/pull/13304#discussion_r112095 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { Review Comment: Note that `case` is used for many reasons. Do we actually need `equals`/`hashCode` (eg are these classes used as map keys or compared against each other)? If not, then it's maintenance overhead without benefit. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { Review Comment: Note that `case class` is used for many reasons. Do we actually need `equals`/`hashCode` (eg are these classes used as map keys or compared against each other)? If not, then it's maintenance overhead without benefit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13288: MINOR: fix rerun-tests for unit test
guozhangwang commented on PR #13288: URL: https://github.com/apache/kafka/pull/13288#issuecomment-1449082224 The referenced https://github.com/apache/kafka/pull/11926 seems not relevant, is there a typo in the PR number? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
junrao commented on code in PR #13304: URL: https://github.com/apache/kafka/pull/13304#discussion_r1120915339 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Objects; +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { + +public final FetchDataInfo fetchedData; +public final Optional divergingEpoch; +public final long highWatermark; +public final long logStartOffset; +public final long logEndOffset; +public final long lastStableOffset; + +public LogReadInfo(FetchDataInfo fetchedData, Review Comment: Should we define `equals()` and `hashCode()` for FetchDataInfo too? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * Container class which represents a snapshot of the significant offsets for a partition. This allows fetching + * of these offsets atomically without the possibility of a leader change affecting their consistency relative + * to each other. See {@link UnifiedLog#fetchOffsetSnapshot()}. + */ +public class LogOffsetSnapshot { Review Comment: Should we define equals() and hashCode()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
OneCricketeer commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1120908532 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: As one example, using the JDBC sink connector (or S3, or any that write to external location). These in themselves are fine, but let's say someone knows the environment variable provider is installed on the worker, so they add a series of InsertField transforms to attempt to pull out as many well-known environment variable names as possible (such as `AWS_SECRET_ACCESS_KEY`, as mentioned). This isn't stopped by a protected REST API. I'm saying without some configuration property for the provider to define an allowlist pattern, possibly set up the configure method, then that would expose credentials meant to only be viewed by administrators of the workers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120897608 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: > I would be really interested in knowing which case can't be covered here. If you can dig up the Converter counterexample, would you mind sharing it? Here's a snippet from the IsolatedConverter, which is a few PRs in the future from now. These methods take 3 or 4 arguments, and there's no builtin java functional type that covers them (I guess they would be TriFunction and QuadFunction?) ``` public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) throws Exception { return isolate(() -> delegate.fromConnectData(topic, headers, schema, value)); } public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) throws Exception { return isolate(() -> delegate.toConnectData(topic, headers, value)); } public byte[] fromConnectData(String topic, Schema schema, Object value) throws Exception { return isolate(() -> delegate.fromConnectData(topic, schema, value)); } public SchemaAndValue toConnectData(String topic, byte[] value) throws Exception { return isolate(delegate::toConnectData, topic, value); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120897608 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: > I would be really interested in knowing which case can't be covered here. If you can dig up the Converter counterexample, would you mind sharing it? Here's a snippet from the IsolatedConverter, which is a few PRs in the future from now. These methods take 3 or 4 arguments, and there's no builtin java functional type that covers them (I guess they would be TriFunction and QuadFunction?) ``` public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) throws Exception { return isolate(() -> delegate.fromConnectData(topic, headers, schema, value)); } public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) throws Exception { return isolate(() -> delegate.toConnectData(topic, headers, value)); } public byte[] fromConnectData(String topic, Schema schema, Object value) throws Exception { return isolate(() -> delegate.fromConnectData(topic, schema, value)); } public SchemaAndValue toConnectData(String topic, byte[] value) throws Exception { return isolate(delegate::toConnectData, topic, value); } ``` The -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120889645 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: If it comes down to not being able to completely satisfy all use cases with just the `isolate(Callable)` (and possibly `isolateV(ThrowingRunnable)` methods, then I think it could be alright to add more variants. But the additional stack frames, and even ugly class names, don't seem likely to be super important to users or plugin developers. In both cases (though probably more often the latter), aren't they more likely to look for the first stack frame that originates with their plugin class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120887378 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: I would be really interested in knowing which case can't be covered here. If you can dig up the `Converter` counterexample, would you mind sharing it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120886025 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} + +public interface ThrowingRunnable { +void run() throws Exception; +} + +@Override +public int hashCode() { +try { +return isolate(delegate::hashCode); +} catch (Throwable e) { +throw new RuntimeException("unable to evaluate plugin hashCode", e); +} +} + +@Override +public boolean equals(Object obj) { +if (obj == null || this.getClass() != obj.getClass()) { +return false; +} Review Comment: Given that equality across `Connector` instances is likely not a developer priority, I think you're right regarding reference quality for delegates. Good point 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120880473 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} + +public interface ThrowingRunnable { +void run() throws Exception; +} + +@Override +public int hashCode() { +try { +return isolate(delegate::hashCode); +} catch (Throwable e) { +throw new RuntimeException("unable to evaluate plugin hashCode", e); +} +} + +@Override +public boolean equals(Object obj) { +if (obj == null || this.getClass() != obj.getClass()) { +return false; +} Review Comment: I think you're right. I could imagine a particularly silly `Connector::equals` method which always returns `true` and causes poor behavior in a hash map or other data structure which relies on the equals method. I also wonder if it is a good idea to call the delegate equals method, or whether we should use reference equality (this.delegate == other.delegate) and skip calling the equals method entirely. I don't believe that we're using these methods anywhere in the runtime, and I don't think that it's common for developers to override the equals/hashCode, so we get to choose what the most useful implementation of these methods is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please
[GitHub] [kafka] ijuma commented on a diff in pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc
ijuma commented on code in PR #13315: URL: https://github.com/apache/kafka/pull/13315#discussion_r1120878251 ## build.gradle: ## @@ -159,16 +159,9 @@ def determineCommitId() { def takeFromHash = 16 if (project.hasProperty('commitId')) { commitId.take(takeFromHash) - } else if (file("$rootDir/.git/HEAD").exists()) { -def headRef = file("$rootDir/.git/HEAD").text -if (headRef.contains('ref: ')) { - headRef = headRef.replaceAll('ref: ', '').trim() - if (file("$rootDir/.git/$headRef").exists()) { -file("$rootDir/.git/$headRef").text.trim().take(takeFromHash) - } -} else { - headRef.trim().take(takeFromHash) -} + } else if (file("$rootDir/.git").exists()) { +def repo = Grgit.open(currentDir: project.getRootDir()) Review Comment: The `rat` configuration does something similar, maybe we should have a variable for the git repo that can be used by both tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #13176: MINOR: some ZK migration code cleanups.
cmccabe closed pull request #13176: MINOR: some ZK migration code cleanups. URL: https://github.com/apache/kafka/pull/13176 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120867064 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: To take it to the extreme, we would inline and eliminate each of the `isolate` calls entirely and put the try-with-resources swap in the methods themselves. That would eliminate all lambdas and only add one stack frame, but be super repetitive in the high 10s of methods that have to be isolated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120864255 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} +} + +protected void isolateV(Consumer consumer, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t); +} +} + +protected void isolateV(BiConsumer consumer, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +consumer.accept(t, u); +} +} + +protected R isolate(Function function, T t) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t); +} +} + +protected R isolate(BiFunction function, T t, U u) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return function.apply(t, u); +} +} Review Comment: I think this is a question of readability, but it's important to consider the readability for operators of these connectors. With the current design that uses mostly method references, exceptions have the following stacktrace: ``` org.apache.kafka.connect.errors.ConnectException: at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.lambda$testSinkConnector$0(IsolatedPluginTest.java:43) ... at org.apache.kafka.connect.connector.Connector.initialize(Connector.java:57) at org.apache.kafka.connect.runtime.isolation.IsolatedPlugin.isolateV(IsolatedPlugin.java:66) at org.apache.kafka.connect.runtime.isolation.IsolatedConnector.initialize(IsolatedConnector.java:39) at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.testSinkConnector(IsolatedPluginTest.java:46) ... ``` There's two stack frames associated with the isolation infrastructure that did not exist befor, and neither of them are anonymous lambdas (the `IsolatedPluginTest.lambda$testSinkConnector$0` is an anonymous lambda in my test, not part of the IsolatedPlugin). Compare that to the style of making everything into a callable: ``` org.apache.kafka.connect.errors.ConnectException: at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.lambda$testSinkConnector$0(IsolatedPluginTest.java:43) .
[GitHub] [kafka] C0urante commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes
C0urante commented on code in PR #13182: URL: https://github.com/apache/kafka/pull/13182#discussion_r1120776026 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -419,7 +423,14 @@ private Collection> getServiceLoaderPluginDesc(Class klass, Collection> result = new ArrayList<>(); try { ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (T pluginImpl : serviceLoader) { +for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { +T pluginImpl; +try { +pluginImpl = iterator.next(); +} catch (ServiceConfigurationError t) { +log.error("Unable to instantiate plugin{}", reflectiveErrorDescription(t.getCause()), t); Review Comment: Would it be more informative to use the type of plugin class being instantiated instead of just "plugin"? ```suggestion log.error("Unable to instantiate {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ## @@ -111,20 +115,62 @@ public enum TestPlugin { /** * A plugin which shares a jar file with {@link TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE} */ -MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", "test.plugins.ThingTwo"); +MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", "test.plugins.ThingTwo"), +/** + * A plugin which is incorrectly packaged, and is missing a superclass definition. + */ +FAIL_TO_INITIALIZE_MISSING_SUPERCLASS("fail-to-initialize", "test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER), +/** + * A plugin which is packaged with other incorrectly packaged plugins, but itself has no issues loading. + */ +FAIL_TO_INITIALIZE_CO_LOCATED("fail-to-initialize", "test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER), +/** + * A connector which is incorrectly packaged, and throws during static initialization. + */ + FAIL_TO_INITIALIZE_STATIC_INITIALIZER_THROWS_CONNECTOR("fail-to-initialize", "test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER), +/** + * A plugin which is incorrectly packaged, which throws an exception from the {@link Versioned#version()} method. + */ + FAIL_TO_INITIALIZE_VERSION_METHOD_THROWS_CONNECTOR("fail-to-initialize", "test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER), Review Comment: Nit: d'you think `FAIL_TO_INITIALIZE` is a bit inaccurate here, since we do manage to load and use the plugin despite its version method throwing an exception? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -439,6 +457,22 @@ public static String versionFor(Class pluginKlass) throws Refle versionFor(pluginKlass.getDeclaredConstructor().newInstance()) : UNDEFINED_VERSION; } +private static String reflectiveErrorDescription(Throwable t) { +if (t instanceof NoSuchMethodException) { +return ": Plugin class must have a default constructor, and cannot be a non-static inner class"; +} else if (t instanceof SecurityException) { +return ": Security settings must allow reflective instantiation of plugin classes"; +} else if (t instanceof IllegalAccessException) { +return ": Plugin class default constructor must be public"; +} else if (t instanceof ExceptionInInitializerError) { +return ": Plugin class should not throw exception during static initialization"; +} else if (t instanceof InvocationTargetException) { +return ": Constructor must complete without throwing an exception"; +} else { +return ""; Review Comment: I like this method for the most part; it helps provide a nice summary of what's wrong with the plugin class and how to fix things. One thought I had is that it may be better in some if not all cases to describe the problem rather than prescribe a solution. For example, "Constructor must complete without throwing an exception" isn't really groundbreaking information; it's probably fine to just say "Failed to invoke constructor" and let users figure out the rest from the remainder of the stack trace. I think this applies to the branches for `ExceptionInInitializerError` and `InvocationTargetException` classes, and the others could be left as-are, but feel free to tweak the wording if they could be improved as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitH
[jira] [Updated] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version
[ https://issues.apache.org/jira/browse/KAFKA-14769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14769: - Affects Version/s: (was: 3.4.0) > NPE in ControllerMetricsManager when upgrading from old KRaft version > - > > Key: KAFKA-14769 > URL: https://issues.apache.org/jira/browse/KAFKA-14769 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: Jose Armando Garcia Sancio >Priority: Critical > > In older KRaft versions, we could see a ConfigRecord for a topic config > appear before the TopicRecord in a batch. > When upgrading from an older KRaft version (e.g., 3.1), the latest code in > the KRaft controller hits an NPE when it encounters a ConfigRecord for a > topic config before the TopicRecord. This was introduced relatively recently > by KAFKA-14457 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version
[ https://issues.apache.org/jira/browse/KAFKA-14769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14769: - Fix Version/s: (was: 3.5.0) (was: 3.4.1) > NPE in ControllerMetricsManager when upgrading from old KRaft version > - > > Key: KAFKA-14769 > URL: https://issues.apache.org/jira/browse/KAFKA-14769 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: David Arthur >Assignee: Jose Armando Garcia Sancio >Priority: Critical > > In older KRaft versions, we could see a ConfigRecord for a topic config > appear before the TopicRecord in a batch. > When upgrading from an older KRaft version (e.g., 3.1), the latest code in > the KRaft controller hits an NPE when it encounters a ConfigRecord for a > topic config before the TopicRecord. This was introduced relatively recently > by KAFKA-14457 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version
[ https://issues.apache.org/jira/browse/KAFKA-14769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14769. -- Resolution: Invalid > NPE in ControllerMetricsManager when upgrading from old KRaft version > - > > Key: KAFKA-14769 > URL: https://issues.apache.org/jira/browse/KAFKA-14769 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0 >Reporter: David Arthur >Assignee: Jose Armando Garcia Sancio >Priority: Critical > Fix For: 3.5.0, 3.4.1 > > > In older KRaft versions, we could see a ConfigRecord for a topic config > appear before the TopicRecord in a batch. > When upgrading from an older KRaft version (e.g., 3.1), the latest code in > the KRaft controller hits an NPE when it encounters a ConfigRecord for a > topic config before the TopicRecord. This was introduced relatively recently > by KAFKA-14457 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14769) NPE in ControllerMetricsManager when upgrading from old KRaft version
David Arthur created KAFKA-14769: Summary: NPE in ControllerMetricsManager when upgrading from old KRaft version Key: KAFKA-14769 URL: https://issues.apache.org/jira/browse/KAFKA-14769 Project: Kafka Issue Type: Bug Affects Versions: 3.4.0 Reporter: David Arthur Assignee: Jose Armando Garcia Sancio Fix For: 3.5.0, 3.4.1 In older KRaft versions, we could see a ConfigRecord for a topic config appear before the TopicRecord in a batch. When upgrading from an older KRaft version (e.g., 3.1), the latest code in the KRaft controller hits an NPE when it encounters a ConfigRecord for a topic config before the TopicRecord. This was introduced relatively recently by KAFKA-14457 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
C0urante commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1120685793 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class IsolatedPlugin { + +private final Plugins plugins; +private final Class pluginClass; +protected final P delegate; +private final ClassLoader classLoader; +private final PluginType type; + +IsolatedPlugin(Plugins plugins, P delegate, PluginType type) { +this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null"); +this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null"); +this.pluginClass = delegate.getClass(); +ClassLoader classLoader = pluginClass.getClassLoader(); +this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class"); +this.type = Objects.requireNonNull(type, "plugin type must be non-null"); +} + +public PluginType type() { +return type; +} + +@SuppressWarnings("unchecked") +public Class pluginClass() { +return (Class) pluginClass; +} + +protected V isolate(Callable callable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +return callable.call(); +} +} + +protected void isolateV(ThrowingRunnable runnable) throws Exception { +try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { +runnable.run(); +} Review Comment: We can reduce duplication here by piggybacking off of the non-void `isolate` method: ```suggestion isolate(() -> { runnable.run(); return null; }); ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ## @@ -419,8 +420,14 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State metricGroup.close(); metricGroup.addImmutableValueMetric(registry.connectorType, connectorType()); -metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName()); -metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version()); +metricGroup.addImmutableValueMetric(registry.connectorClass, connector.pluginClass().getName()); +String version; +try { +version = connector.version(); +} catch (Exception e) { +version = DelegatingClassLoader.UNDEFINED_VERSION; +} Review Comment: This technically changes behavior, right? We go from failing the connector on startup to just assigning it an undefined version. I think this is probably fine (the benefits outweigh the costs, and I have a hard time imagining any use case that would favor failing here), just want to make sure I'm gauging the impact here correctly. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eith
[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
C0urante commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1120750168 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: Sorry, by "deploy code", do you mean a custom connector class? Because if that's the case, then credentials leakage is not going to be prevented by modifying this config provider since the connector can just invoke `System::getEnv` and do whatever it wants with that info. If we're discussing connector configurations instead, then it's already possible, and basically necessary, to lock down your Connect worker's REST API to prevent users from submitting arbitrary connector configurations to your worker in anything but local testing environments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
OneCricketeer commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1114750313 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -41,6 +46,14 @@ public EnvVarConfigProvider(Map envVarsAsArgument) { @Override public void configure(Map configs) { +if (configs.keySet().contains(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) { Review Comment: `containsKey` would be more clear ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: > controlled on an opt-in basis by the cluster administrator via worker config files Right, so say, I, as an administrator, enable an environment variable provider, and have secrets defined on the workers, that I assume can only be read by authorized admins. Then allow anyone to deploy code that can then read said variables without any additional controls. That causes credential leakage, regardless of other capabilities for arbitrary code execution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on code in PR #13190: URL: https://github.com/apache/kafka/pull/13190#discussion_r1120665269 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -501,13 +501,18 @@ boolean joinGroupIfNeeded(final Timer timer) { } if (exception instanceof UnknownMemberIdException || -exception instanceof IllegalGenerationException || -exception instanceof RebalanceInProgressException || -exception instanceof MemberIdRequiredException) +exception instanceof IllegalGenerationException || +exception instanceof RebalanceInProgressException || +exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) Review Comment: the previous logic was reverted with some autocorrection to the indentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on code in PR #13190: URL: https://github.com/apache/kafka/pull/13190#discussion_r1120663942 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1484,7 +1484,8 @@ public void testRebalanceWithMetadataChange() { Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1; client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); -coordinator.poll(time.timer(0)); +assertFalse(client.hasInFlightRequests()); +coordinator.poll(time.timer(1)); Review Comment: note: we need to add a timeout here to give the retry a second chance, because in the new code, the timer is checked and causes the method to exit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbownik commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
lbownik commented on code in PR #13067: URL: https://github.com/apache/kafka/pull/13067#discussion_r1120661170 ## server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import org.apache.kafka.common.utils.Sanitizer; + +public class KafkaMetricsGroup { +private final Class klass; + +public KafkaMetricsGroup(Class klass) { +this.klass = klass; +} + +/** + * Creates a new MetricName object for gauges, meters, etc. created for this + * metrics group. + * @param name Descriptive name of the metric. + * @param tags Additional attributes which mBean will have. + * @return Sanitized metric name object. + */ +public MetricName metricName(String name, Map tags) { +String pkg; +if (klass.getPackage() == null) { +pkg = ""; +} else { +pkg = klass.getPackage().getName(); +} +String simpleName = klass.getSimpleName().replaceAll("\\$$", ""); +return explicitMetricName(pkg, simpleName, name, tags); +} + +public static MetricName explicitMetricName(String group, String typeName, +String name, Map tags) { +StringBuilder nameBuilder = new StringBuilder(); Review Comment: maybe initialize the builder with some initial calacity (default is 16 - a little low). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.
C0urante commented on code in PR #13287: URL: https://github.com/apache/kafka/pull/13287#discussion_r1120636637 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -427,9 +427,8 @@ private void updateConnectorTasks(String connName) { } List> newTaskConfigs = recomputeTaskConfigs(connName); -List> oldTaskConfigs = configState.allTaskConfigs(connName); -if (!newTaskConfigs.equals(oldTaskConfigs)) { Review Comment: I think the most convincing argument here is point 1; any duplication of work that isn't inevitable can be addressed to the same degree regardless of which class contains the logic, and I don't think the risk of improving the naive search is super high. Anyways, sounds good to me 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14767) Gradle build fails with missing commitId after git gc
[ https://issues.apache.org/jira/browse/KAFKA-14767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694207#comment-17694207 ] Greg Harris edited comment on KAFKA-14767 at 2/28/23 7:01 PM: -- This happens occasionally when git performs an automatic `gc`, and often causes build failures when I merge `trunk` into my development branches. My build usually won't complete until I finish the merge commit, which I may have to go back and amend if i introduced any build breakages. I know of one workaround: Take the branch you're working on and move it to re-create the non-gc'd refs file: {noformat} branch="$(git branch --show-current)" git checkout -b placeholder git branch -f $branch trunk git branch -f $branch placeholder git checkout $branch git branch -D placeholder{noformat} was (Author: gharris1727): This happens occasionally when git performs an automatic `gc`, and often causes build failures when I merge `trunk` into my development branches. My build usually won't complete until I finish the merge commit, which I may have to go back and amend if i introduced any build breakages. I know of one workaround: Take the branch you're working on and move it to re-create the non-gc'd refs file: {noformat} brach="$(git branch --show-current)" git checkout -b placeholder git branch -f $branch trunk git branch -f $branch placeholder git checkout $branch git branch -D placeholder{noformat} > Gradle build fails with missing commitId after git gc > - > > Key: KAFKA-14767 > URL: https://issues.apache.org/jira/browse/KAFKA-14767 > Project: Kafka > Issue Type: Bug > Components: build >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > Reproduction steps: > 1. `git gc` > 2. `./gradlew jar` > Expected behavior: build completes successfully (or shows other build errors) > Actual behavior: > {noformat} > Task failed with an exception. > --- > * What went wrong: > A problem was found with the configuration of task > ':storage:createVersionFile' (type 'DefaultTask'). > - Property 'commitId' doesn't have a configured value. > > Reason: This property isn't marked as optional and no value has been > configured. > > Possible solutions: > 1. Assign a value to 'commitId'. > 2. Mark property 'commitId' as optional. > > Please refer to > https://docs.gradle.org/7.6/userguide/validation_problems.html#value_not_set > for more details about this problem.{noformat} > This appears to be due to the fact that the build.gradle determineCommitId() > function is unable to read the git commit hash for the current HEAD. This > appears to happen after a `git gc` takes place, which causes the > `.git/refs/heads/*` files to be moved to `.git/packed-refs`. > The determineCommitId() should be patched to also try reading from the > packed-refs to determine the commit hash. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14767) Gradle build fails with missing commitId after git gc
[ https://issues.apache.org/jira/browse/KAFKA-14767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694207#comment-17694207 ] Greg Harris edited comment on KAFKA-14767 at 2/28/23 7:01 PM: -- This happens occasionally when git performs an automatic `gc`, and often causes build failures when I merge `trunk` into my development branches. My build usually won't complete until I finish the merge commit, which I may have to go back and amend if i introduced any build breakages. I know of one workaround: Take the branch you're working on and move it to re-create the non-gc'd refs file: {noformat} brach="$(git branch --show-current)" git checkout -b placeholder git branch -f $branch trunk git branch -f $branch placeholder git checkout $branch git branch -D placeholder{noformat} was (Author: gharris1727): This happens occasionally when git performs an automatic `gc`, and often causes build failures when I merge `trunk` into my development branches. My build usually won't complete until I finish the merge commit, which I may have to go back and amend if i introduced any build breakages. I know of one workaround: Take the branch you're working on and move it to re-create the non-gc'd refs file: {noformat} brach= git checkout -b placeholder git branch -f $branch trunk git branch -f $branch placeholder git checkout $branch git branch -D placeholder{noformat} > Gradle build fails with missing commitId after git gc > - > > Key: KAFKA-14767 > URL: https://issues.apache.org/jira/browse/KAFKA-14767 > Project: Kafka > Issue Type: Bug > Components: build >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > Reproduction steps: > 1. `git gc` > 2. `./gradlew jar` > Expected behavior: build completes successfully (or shows other build errors) > Actual behavior: > {noformat} > Task failed with an exception. > --- > * What went wrong: > A problem was found with the configuration of task > ':storage:createVersionFile' (type 'DefaultTask'). > - Property 'commitId' doesn't have a configured value. > > Reason: This property isn't marked as optional and no value has been > configured. > > Possible solutions: > 1. Assign a value to 'commitId'. > 2. Mark property 'commitId' as optional. > > Please refer to > https://docs.gradle.org/7.6/userguide/validation_problems.html#value_not_set > for more details about this problem.{noformat} > This appears to be due to the fact that the build.gradle determineCommitId() > function is unable to read the git commit hash for the current HEAD. This > appears to happen after a `git gc` takes place, which causes the > `.git/refs/heads/*` files to be moved to `.git/packed-refs`. > The determineCommitId() should be patched to also try reading from the > packed-refs to determine the commit hash. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.
gharris1727 commented on code in PR #13287: URL: https://github.com/apache/kafka/pull/13287#discussion_r1120610035 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -427,9 +427,8 @@ private void updateConnectorTasks(String connName) { } List> newTaskConfigs = recomputeTaskConfigs(connName); -List> oldTaskConfigs = configState.allTaskConfigs(connName); -if (!newTaskConfigs.equals(oldTaskConfigs)) { Review Comment: This logic is simpler, but only because `ClusterConfigState::allTaskConfigs` and `List::equals` are abstracting some of the complexity. There are multiple reasons I decided to keep the DistributedHerder implementation instead of the Standalone herder: 1. The DistributedHerder implementation allows us to provide more detailed debug logs about how the task configs are different, in a way that `List::equals` hides. Since distributed mode is more common, I figured adding those logs to standalone was better than taking them away from distributed mode. 2. The `taskConfig` and `allTaskConfigs` are duplicating one another to some degree (they're both retrieving task configs, they're both applying transformations) but we cannot remove `taskConfig` as it is used in lots of other areas of the herder. 3. The `allTaskConfigs` is currently performing a naive search over all known task configs, and while we could fix that, it requires a larger change with higher regression risk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
C0urante commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1120563796 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.config.provider.EnvVarConfigProviderConfig.ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; +private Pattern envVarPattern; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +if (configs.keySet().contains(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) { +envVarPattern = Pattern.compile( + String.valueOf(configs.get(ENV_VAR_CONFIG_PROVIDER_PATTERN_CONFIG)) +); +} else { +envVarPattern = Pattern.compile(".*"); +log.info("No pattern for environment variables provided. Using default pattern '(.*)'."); +} +} + +@Override +public void close() throws IOException { +} + +/** + * @param s unused + * @return returns environment variables as configuration + */ +@Override +public ConfigData get(String s) { +return get(s, null); +} + +/** + * @param pathpath, not used for environment variables + * @param keys the keys whose values will be retrieved. + * @return the configuration data. + */ +@Override +public ConfigData get(String path, Set keys) { + +if (path != null && !path.isEmpty()) { +log.error("Path is not supported for EnvVarConfigProvider, invalid value '{}'", path); +throw new ConfigException("Path is not supported for EnvVarConfigProvider, invalid value '" + path + "'"); +} + +if (envVarMap == null) { +return new ConfigData(new HashMap<>()); +} + +Map filteredEnvVarMap = envVarMap; + +filteredEnvVarMap = envVarMap.entrySet().stream() Review Comment: Nit: can be simplified ```suggestion Map filteredEnvVarMap = envVarMap.entrySet().stream() ``` ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderConfig.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.AbstractConfig; Review Comment: Unused import (causing Checkstyle failures during build) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For que
[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
C0urante commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1120561014 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: IMO this isn't really necessary; in Connect, no config providers are enabled by default, and they are controlled on an opt-in basis by the cluster administrator via worker config files. You cannot cause a new config provider to be used in a connector config that is not already enabled in the worker config. Additionally, if we're discussing malicious connector classes (instead of malicious connector configs), arbitrary code execution is already possible, so this kind of feature wouldn't prevent people from reading environment variables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
ivanyu commented on code in PR #13067: URL: https://github.com/apache/kafka/pull/13067#discussion_r1120551247 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -497,51 +501,59 @@ object RequestMetrics { val ErrorsPerSec = "ErrorsPerSec" } -class RequestMetrics(name: String) extends KafkaMetricsGroup { +class RequestMetrics(name: String) { import RequestMetrics._ - val tags = Map("request" -> name) + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + + val tags = Map("request" -> name).asJava val requestRateInternal = new Pool[Short, Meter]() // time a request spent in a request queue - val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags) + val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, true, tags) // time a request takes to be processed at the local broker - val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags) + val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags) // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests) - val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags) + val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags) // time a request is throttled, not part of the request processing time (throttling is done at the client level // for clients that support KIP-219 and by muting the channel for the rest) - val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags) + val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags) // time a response spent in a response queue - val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags) + val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs, true, tags) // time to send the response to the requester - val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, tags) - val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags) + val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs, true, tags) + val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags) // request size in bytes - val requestBytesHist = newHistogram(RequestBytes, biased = true, tags) + val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags) // time for message conversions (only relevant to fetch and produce requests) val messageConversionsTimeHist = if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) - Some(newHistogram(MessageConversionsTimeMs, biased = true, tags)) + Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags)) else None // Temporary memory allocated for processing request (only populated for fetch and produce requests) // This shows the memory allocated for compression/conversions excluding the actual request size val tempMemoryBytesHist = if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) - Some(newHistogram(TemporaryMemoryBytes, biased = true, tags)) + Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags)) else None private val errorMeters = mutable.Map[Errors, ErrorMeter]() Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error))) - def requestRate(version: Short): Meter = { -requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) + def requestRate(version: Short): Meter = +requestRateInternal.getAndMaybePut(version, metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tagsWithVersion(version))) + + def tagsWithVersion(version: Short): java.util.Map[String, String] = { +val nameAndVersionTags = new util.HashMap[String, String]() Review Comment: I didn't do this because considered this an infrequent operation, but sure, have done this now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13286: MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder
C0urante merged PR #13286: URL: https://github.com/apache/kafka/pull/13286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13287: MINOR: Refactor task change logic to AbstractHerder, reuse for standalone mode.
C0urante commented on code in PR #13287: URL: https://github.com/apache/kafka/pull/13287#discussion_r1120511592 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -427,9 +427,8 @@ private void updateConnectorTasks(String connName) { } List> newTaskConfigs = recomputeTaskConfigs(connName); -List> oldTaskConfigs = configState.allTaskConfigs(connName); -if (!newTaskConfigs.equals(oldTaskConfigs)) { Review Comment: This logic seems significantly simpler than what's currently used in `DistributedHerder:: reconfigureConnector ` and moved to `AbstractHerder::taskConfigsChanged` in this PR. Why not keep this logic instead of removing `ConfigClusterState::allTaskConfigs`? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1981,6 +1966,8 @@ private void reconfigureConnector(final String connName, final Callback cb } }); } +} else { +log.debug("Skipping reconfiguration of connector {} as generated configs appear unchanged", connName); Review Comment: Why not move this into `AbstractHerder::taskConfigsChanged` as well, so that it's picked up in both standalone and distributed mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
satishd commented on PR #13304: URL: https://github.com/apache/kafka/pull/13304#issuecomment-1448587425 Thanks @junrao for your review. Addressed them with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-14533: --- Assignee: Guozhang Wang > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-14533 > URL: https://issues.apache.org/jira/browse/KAFKA-14533 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Greg Harris >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test > > The SmokeTestDriverIntegrationTest appears to be flakey failing in recent > runs: > ``` > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > ``` > The stacktrace appears to be: > ``` > java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed > out after 600 seconds > at > org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > ... > Suppressed: java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > ... 134 more > ``` > The test appears to be timing out waiting for the SmokeTestClient to complete > its asynchronous close, and taking significantly longer to do so (600s > instead of 60s) than a typical local test execution time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
ijuma commented on code in PR #13067: URL: https://github.com/apache/kafka/pull/13067#discussion_r1120488775 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -497,51 +501,59 @@ object RequestMetrics { val ErrorsPerSec = "ErrorsPerSec" } -class RequestMetrics(name: String) extends KafkaMetricsGroup { +class RequestMetrics(name: String) { import RequestMetrics._ - val tags = Map("request" -> name) + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + + val tags = Map("request" -> name).asJava val requestRateInternal = new Pool[Short, Meter]() // time a request spent in a request queue - val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags) + val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, true, tags) // time a request takes to be processed at the local broker - val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags) + val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags) // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests) - val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags) + val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags) // time a request is throttled, not part of the request processing time (throttling is done at the client level // for clients that support KIP-219 and by muting the channel for the rest) - val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags) + val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags) // time a response spent in a response queue - val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags) + val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs, true, tags) // time to send the response to the requester - val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, tags) - val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags) + val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs, true, tags) + val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags) // request size in bytes - val requestBytesHist = newHistogram(RequestBytes, biased = true, tags) + val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags) // time for message conversions (only relevant to fetch and produce requests) val messageConversionsTimeHist = if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) - Some(newHistogram(MessageConversionsTimeMs, biased = true, tags)) + Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags)) else None // Temporary memory allocated for processing request (only populated for fetch and produce requests) // This shows the memory allocated for compression/conversions excluding the actual request size val tempMemoryBytesHist = if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) - Some(newHistogram(TemporaryMemoryBytes, biased = true, tags)) + Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags)) else None private val errorMeters = mutable.Map[Errors, ErrorMeter]() Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error))) - def requestRate(version: Short): Meter = { -requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) + def requestRate(version: Short): Meter = +requestRateInternal.getAndMaybePut(version, metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tagsWithVersion(version))) + + def tagsWithVersion(version: Short): java.util.Map[String, String] = { +val nameAndVersionTags = new util.HashMap[String, String]() Review Comment: Maybe we can presize the map correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
philipnee commented on code in PR #13190: URL: https://github.com/apache/kafka/pull/13190#discussion_r1120481527 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) { requestRejoin(shortReason, fullReason); } +// 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired. if (exception instanceof UnknownMemberIdException || -exception instanceof IllegalGenerationException || -exception instanceof RebalanceInProgressException || -exception instanceof MemberIdRequiredException) +exception instanceof IllegalGenerationException || Review Comment: Hey thanks for the comments again and absolutely no apology is needed there! I guess, as we all know, rebalancing is full of subtleties, so it makes sense to be careful about these non-retriable exception case. I think it's a good idea to keep the original behavior consistent, in case of unexpected breakage. Updating the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start
C0urante commented on PR #13284: URL: https://github.com/apache/kafka/pull/13284#issuecomment-1448579807 > I think that fixing the test by extending the timeout might hide the problems that users may be facing in production as well. I don't think that bumping the timeouts in these tests is likely to hide issues that may surface in production; rather, I'm only worried that they'll make local testing and development more cumbersome. > Hence, I believe, we need a better mechanism to check for successful startup of MirrorMaker. Thoughts? Agreed 👍 IMO MM2 observability in general can be improved. Right now you're basically limited to JMX, other custom metrics reporters, and logs. There's no public API for viewing cluster-wide health info (the closest thing is reading directly from the status topic, but that's not public API and we shouldn't count on users doing that or recommend it). If there's an issue starting one of the connectors, instead of failing startup of the MM2 node, we [log a message](https://github.com/apache/kafka/blob/f586fa59d3f938e04bda4e8143ddb1c4310eaf78/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L232-L237) and move on. I can't imagine this is very easy for users to work with, though maybe I'm underestimating the utility of JMX and logs here. > In absence of such a mechanism, today, a user might call MirrorMaker#start() and assume that it has started correctly while in the background, connector start up might have failed (just as it did in this test scenario). This is less concerning because the `MirrorMaker` class isn't part of public API; users are not expected to directly interact with any instances of this class and we do not support any use cases like that right now. I'm wondering if, instead of a `Future`-based approach, we might add an internal API to the `MirrorMaker` class to expose the status of the connectors running on it? A brief sketch: ```java public class MirrorMaker { public ConnectorStateInfo connectorStatus(String source, String target, String connector) { SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); checkHerder(sourceAndTarget); return herders.get(new SourceAndTarget(source, target)).connectorStatus(connector); } } ``` We could then leverage this API in the `DedicatedMirrorIntegrationTest` suite to implement an equivalent of the `waitUntilMirrorMakerIsRunning` method from the `MirrorConnectorsIntegrationBaseTest` suite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1448576594 I removed the checkstyle statements which allow `connect-runtime` to import `tools`. Since this is the only dependency on `tools`, we also have an opportunity to disallow _any_ importing of the tools package, similar to the kafka server. Is this a good idea? Does it make sense for the `tools` package to disallow everyone from importing it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang commented on code in PR #13190: URL: https://github.com/apache/kafka/pull/13190#discussion_r1120460193 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) { requestRejoin(shortReason, fullReason); } +// 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired. Review Comment: I think here the comment is not to just state what the code did, since readers can just understand that from the code :P instead what we want to emphasize is to remind future contributors that they should be careful to not change the precedence ordering of this logic unnecessarily. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) { requestRejoin(shortReason, fullReason); } +// 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired. if (exception instanceof UnknownMemberIdException || -exception instanceof IllegalGenerationException || -exception instanceof RebalanceInProgressException || -exception instanceof MemberIdRequiredException) +exception instanceof IllegalGenerationException || Review Comment: Ah thanks for the clarifications! Thinking about this a bit more (sorry for getting back and forth..), I now concerned a bit more that for some usage patterns where `poll` call would be triggered less frequently, we may not be coming back to handle these four exceptions while at the same time the broker is ticking and waiting for the join-group request to be re-sent. Hence I'm changing my mind to lean a bit more to honor the exception types for immediate handling than the timeouts --- again, sorry for going back and forth... So I think we would define the ordering as the following: 1. For un-retriable exception, always try to handle immediately and not honor the timer. 2. Otherwise, honor the timer. In that case, we could just go back to the first time you made the change, i.e. just add the ``` if (timer.isExpired()) return false; ``` After the `if/else-if` block. Still it's better to comment that above ordering is diligently designed as such. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
ijuma commented on code in PR #13312: URL: https://github.com/apache/kafka/pull/13312#discussion_r1120470741 ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException { * @throws IOException if {@link DataInput} throws {@link IOException} */ public static long readVarlong(DataInput in) throws IOException { -long value = 0L; -int i = 0; -long b; -while (((b = in.readByte()) & 0x80) != 0) { -value |= (b & 0x7f) << i; -i += 7; -if (i > 63) -throw illegalVarlongException(value); +long raw = readUnsignedVarlong(in); +return (raw >>> 1) ^ -(raw & 1); +} + +private static long readUnsignedVarlong(DataInput in) throws IOException { +byte tmp = in.readByte(); Review Comment: That's interesting. Can we update the jmh benchmark to have variants where (1) the max varint fits within one byte and where (2) the max varint fits within two bytes. I think that is the most common by far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang commented on code in PR #13190: URL: https://github.com/apache/kafka/pull/13190#discussion_r1120456284 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() { Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1; client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry Review Comment: Ack, that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement
[ https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694652#comment-17694652 ] Guozhang Wang commented on KAFKA-14748: --- I agree for Stream-Stream joins now, since for the case "no right hand side value found" we would not emit immediately so it is the case with "key-extractor returns null". But for table-table FK-joins, today the former case would emit while the latter case would not? > Relax non-null FK left-join requirement > --- > > Key: KAFKA-14748 > URL: https://issues.apache.org/jira/browse/KAFKA-14748 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Kafka Streams enforces a strict non-null-key policy in the DSL across all > key-dependent operations (like aggregations and joins). > This also applies to FK-joins, in particular to the ForeignKeyExtractor. If > it returns `null`, it's treated as invalid. For left-joins, it might make > sense to still accept a `null`, and add the left-hand record with an empty > right-hand-side to the result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils
satishd commented on PR #13309: URL: https://github.com/apache/kafka/pull/13309#issuecomment-1448524298 Thanks @junrao for the review. Addressed them with the latest commit/comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils
satishd commented on code in PR #13309: URL: https://github.com/apache/kafka/pull/13309#discussion_r1120421628 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java: ## @@ -72,4 +92,99 @@ private static String filenamePrefixFromOffset(long offset) { return nf.format(offset); } +/** + * Construct a log file name in the given dir with the given base offset. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + */ +public static File logFile(File dir, long offset) { +return logFile(dir, offset, ""); +} + +/** + * Construct a log file name in the given dir with the given base offset and the given suffix. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + * @param suffix The suffix to be appended to the file name (e.g. "", ".deleted", ".cleaned", ".swap", etc.) + */ +public static File logFile(File dir, long offset, String suffix) { +return new File(dir, filenamePrefixFromOffset(offset) + LOG_FILE_SUFFIX + suffix); +} + +/** + * Construct an index file name in the given dir using the given base offset. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + */ +public static File offsetIndexFile(File dir, long offset) { +return offsetIndexFile(dir, offset, ""); +} + +/** + * Construct an index file name in the given dir using the given base offset and the given suffix. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) + */ +public static File offsetIndexFile(File dir, long offset, String suffix) { +return new File(dir, filenamePrefixFromOffset(offset) + INDEX_FILE_SUFFIX + suffix); +} + +/** + * Construct a time index file name in the given dir using the given base offset. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + */ +public static File timeIndexFile(File dir, long offset) { +return timeIndexFile(dir, offset, ""); +} + +/** + * Construct a time index file name in the given dir using the given base offset and the given suffix. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) + */ +public static File timeIndexFile(File dir, long offset, String suffix) { +return new File(dir, filenamePrefixFromOffset(offset) + TIME_INDEX_FILE_SUFFIX + suffix); +} + +/** + * Construct a transaction index file name in the given dir using the given base offset. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + */ +public static File transactionIndexFile(File dir, long offset) { +return transactionIndexFile(dir, offset, ""); +} + +/** + * Construct a transaction index file name in the given dir using the given base offset and the given suffix. + * + * @param dirThe directory in which the log will reside + * @param offset The base offset of the log file + * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) + */ +public static File transactionIndexFile(File dir, long offset, String suffix) { +return new File(dir, filenamePrefixFromOffset(offset) + TXN_INDEX_FILE_SUFFIX + suffix); +} + +/** + * Returns the offset from the given file. The file name is of the form: {number}.{suffix}. This method extracts + * the number from the given file's name. + * + * @param file file with the offset information as part of its name. + * @return offset of the given file + */ +public static Long offsetFromFile(File file) { Review Comment: This is already removed from LocalLog. Are you asking whether to remove this method from here and use `offsetFromFileName` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
divijvaidya commented on code in PR #13312: URL: https://github.com/apache/kafka/pull/13312#discussion_r1120354148 ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) { * @param buffer The output to write to */ public static void writeUnsignedVarint(int value, ByteBuffer buffer) { Review Comment: Netty uses the default loop based implementation: https://github.com/netty/netty/blob/5d1f99655918c9c034ca090d51b64eced73f742f/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java#L58 I wasn't able to find a project which does what we are doing [except for the blog post](https://steinborn.me/posts/performance/how-fast-can-you-write-a-varint/) I mentioned above and over [here](https://github.com/richardstartin/varints/blob/main/src/main/java/io/github/richardstartin/varints/SmartNoDataDependencyVarIntState.java). Note that protobuf uses unrolled implementation for it's c++ code at https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.h#L913 Not that it matters for us since compilers are different from Java & c++ but adding it here as a data point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation
C0urante merged PR #13184: URL: https://github.com/apache/kafka/pull/13184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
divijvaidya commented on code in PR #13312: URL: https://github.com/apache/kafka/pull/13312#discussion_r1120328865 ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException { * @throws IOException if {@link DataInput} throws {@link IOException} */ public static long readVarlong(DataInput in) throws IOException { -long value = 0L; -int i = 0; -long b; -while (((b = in.readByte()) & 0x80) != 0) { -value |= (b & 0x7f) << i; -i += 7; -if (i > 63) -throw illegalVarlongException(value); +long raw = readUnsignedVarlong(in); +return (raw >>> 1) ^ -(raw & 1); +} + +private static long readUnsignedVarlong(DataInput in) throws IOException { +byte tmp = in.readByte(); Review Comment: No, I have written this by extending Netty's varint32 implementation to work with 64 bits (it's simple loop unrolling, no fancy logic). The heuristics of inlining starts becoming unclearer as we increase the size of function, hence, we don't see much benefit for 64 bit implementation here. I don't have a strong opinion on 64 bit implementation here and would be happy to fall back to exact implementation as Protobuf. [1] [1] https://github.com/protocolbuffers/protobuf/blob/main/java/core/src/main/java/com/google/protobuf/CodedInputStream.java#L1048 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
divijvaidya commented on code in PR #13312: URL: https://github.com/apache/kafka/pull/13312#discussion_r1120323557 ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read */ public static int readUnsignedVarint(ByteBuffer buffer) { Review Comment: Sure, will do. Also protobuf has a similar unrolled implementation for its c++ code at https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.cc#L422 (it's Java variant doesn't use the unrolled implementation since they decided to add a different implementation which favours cases when varints are 1 byte). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks
yashmayya commented on code in PR #13276: URL: https://github.com/apache/kafka/pull/13276#discussion_r1119640881 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA PowerMock.verifyAll(); } +@Test +public void testTaskReconfigurationRetries() { +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); + +// end of initial tick +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +member.wakeup(); +PowerMock.expectLastCall(); + +// second tick +member.ensureActive(); +PowerMock.expectLastCall(); + +EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes(); +EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); +EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)) +.andThrow(new ConnectException("Failed to generate task configs")).anyTimes(); Review Comment: Thanks, I've updated the test to add another herder tick which runs a successful task reconfiguration request (I skipped the addition of another tick because the no further retries bit can be verified by the poll timeout at the end of the previous tick). Regarding the test case for the task reconfiguration REST request to the leader - I did consider that initially but while trying to add one, there were some complications (timing related issues) arising from the use of the `forwardRequestExecutor` at which point I felt like it was more trouble than it was worth. However, your comment made me revisit it and I've made some changes to drop in a simple mock executor service which runs requests synchronously (on the same thread as the caller). Let me know what you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14732) Use an exponential backoff retry mechanism while reconfiguring connector tasks
[ https://issues.apache.org/jira/browse/KAFKA-14732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14732: -- Fix Version/s: 3.5.0 > Use an exponential backoff retry mechanism while reconfiguring connector tasks > -- > > Key: KAFKA-14732 > URL: https://issues.apache.org/jira/browse/KAFKA-14732 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0 > > > Kafka Connect in distributed mode retries infinitely with a fixed retry > backoff (250 ms) in case of errors arising during connector task > reconfiguration. Tasks can be "reconfigured" during connector startup (to get > the initial task configs from the connector), a connector resume or if a > connector explicitly requests it via its context. Task reconfiguration > essentially entails requesting a connector instance for its task configs and > writing them to the Connect cluster's config storage (in case a change in > task configs is detected). A fixed retry backoff of 250 ms leads to very > aggressive retries - consider a Debezium connector which attempts to initiate > a database connection in its [taskConfigs > method|https://github.com/debezium/debezium/blob/bf347da71ad9b0819998a3bc9754b3cc96cc1563/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L63]. > If the connection fails due to something like an invalid login, the Connect > worker will essentially spam connection attempts frequently and indefinitely > (until the connector config / database side configs are fixed). An > exponential backoff retry mechanism seems more well suited for the > [DistributedHerder::reconfigureConnectorTasksWithRetry|https://github.com/apache/kafka/blob/a54a34a11c1c867ff62a7234334cad5139547fd7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1873-L1898] > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks
C0urante merged PR #13276: URL: https://github.com/apache/kafka/pull/13276 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks
C0urante commented on code in PR #13276: URL: https://github.com/apache/kafka/pull/13276#discussion_r1120259483 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA PowerMock.verifyAll(); } +@Test +public void testTaskReconfigurationRetries() { +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); + +// end of initial tick +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +member.wakeup(); +PowerMock.expectLastCall(); + +// second tick +member.ensureActive(); +PowerMock.expectLastCall(); + +EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes(); +EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); + +SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); +EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)) +.andThrow(new ConnectException("Failed to generate task configs")).anyTimes(); Review Comment: Ah, fair point about the fourth tick! I don't love using a synchronous executor here since it diverges significantly from the non-testing behavior of the herder. But, I can't think of a better way to test this without going overboard in complexity, and it does give us decent coverage. So, good enough 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #13316: MINOR: srcJar should depend on processMessages task
ijuma merged PR #13316: URL: https://github.com/apache/kafka/pull/13316 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13316: MINOR: srcJar should depend on processMessages task
ijuma commented on PR #13316: URL: https://github.com/apache/kafka/pull/13316#issuecomment-1448363891 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
ijuma commented on code in PR #13312: URL: https://github.com/apache/kafka/pull/13312#discussion_r1120217103 ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) { * @param buffer The output to write to */ public static void writeUnsignedVarint(int value, ByteBuffer buffer) { Review Comment: What does netty do for this? Similar to what we're doing here or something else? ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read */ public static int readUnsignedVarint(ByteBuffer buffer) { Review Comment: Shall we have a comment indicating that we borrowed the implementation from Netty with a link to the relevant file? ## clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java: ## @@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException { * @throws IOException if {@link DataInput} throws {@link IOException} */ public static long readVarlong(DataInput in) throws IOException { -long value = 0L; -int i = 0; -long b; -while (((b = in.readByte()) & 0x80) != 0) { -value |= (b & 0x7f) << i; -i += 7; -if (i > 63) -throw illegalVarlongException(value); +long raw = readUnsignedVarlong(in); +return (raw >>> 1) ^ -(raw & 1); +} + +private static long readUnsignedVarlong(DataInput in) throws IOException { +byte tmp = in.readByte(); Review Comment: I saw a link to the Netty implementation for varint32, is there one for varint64 too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
divijvaidya commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1448325538 Completing the conversation here, in case someone comes around reading this old thread. We have a new KIP and a PR for Zk migration to 3.8.1 which would hopefully land in 3.5. KIP - https://cwiki.apache.org/confluence/display/KAFKA/KIP-902%3A+Upgrade+Zookeeper+to+3.8.1 PR - https://github.com/apache/kafka/pull/13260 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records
[ https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14659: -- Fix Version/s: 3.3.3 > source-record-write-[rate|total] metrics include filtered records > - > > Key: KAFKA-14659 > URL: https://issues.apache.org/jira/browse/KAFKA-14659 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Beard >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Source tasks in Kafka connect offer two sets of metrics (documented in > [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]): > ||Metric||Description|| > |source-record-poll-rate|The average per-second number of records > produced/polled (before transformation) by this task belonging to the named > source connector in this worker.| > |source-record-write-rate|The average per-second number of records output > from the transformations and written to Kafka for this task belonging to the > named source connector in this worker. This is after transformations are > applied and excludes any records filtered out by the transformations.| > There are also corresponding "-total" metrics that capture the total number > of records polled and written for the metrics above, respectively. > In short, the "poll" metrics capture the number of messages sourced > pre-transformation/filtering, and the "write" metrics should capture the > number of messages ultimately written to Kafka post-transformation/filtering. > However, the implementation of the {{source-record-write-*}} metrics > _includes_ records filtered out by transformations (and also records that > result in produce failures with the config {{{}errors.tolerance=all{}}}). > h3. Details > In > [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397], > each source record is passed through the transformation chain where it is > potentially filtered out, checked to see if it was in fact filtered out, and > if so it is accounted for in the internal metrics via > {{{}counter.skipRecord(){}}}. > {code:java} > for (final SourceRecord preTransformRecord : toSend) { > retryWithToleranceOperator.sourceRecord(preTransformRecord); > final SourceRecord record = > transformationChain.apply(preTransformRecord); > final ProducerRecord producerRecord = > convertTransformedRecord(record); > if (producerRecord == null || retryWithToleranceOperator.failed()) { > > counter.skipRecord(); > recordDropped(preTransformRecord); > continue; > } > ... > {code} > {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows: > {code:java} > > public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup > metricsGroup) { > assert batchSize > 0; > assert metricsGroup != null; > this.batchSize = batchSize; > counter = batchSize; > this.metricsGroup = metricsGroup; > } > public void skipRecord() { > if (counter > 0 && --counter == 0) { > finishedAllWrites(); > } > } > > private void finishedAllWrites() { > if (!completed) { > metricsGroup.recordWrite(batchSize - counter); > completed = true; > } > } > {code} > For example: If a batch starts with 100 records, {{batchSize}} and > {{counter}} will both be initialized to 100. If all 100 records get filtered > out, {{counter}} will be decremented 100 times, and > {{{}finishedAllWrites(){}}}will record the value 100 to the underlying > {{source-record-write-*}} metrics rather than 0, the correct value according > to the documentation for these metrics. > h3. Solutions > Assuming the documentation correctly captures the intent of the > {{source-record-write-*}} metrics, it seems reasonable to fix these metrics > such that filtered records do not get counted. > It may also be useful to add additional metrics to capture the rate and total > number of records filtered out by transformations, which would require a KIP. > I'm not sure what the best way of accounting for produce failures in the case > of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own > new metrics? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records
[ https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14659: -- Fix Version/s: 3.4.1 > source-record-write-[rate|total] metrics include filtered records > - > > Key: KAFKA-14659 > URL: https://issues.apache.org/jira/browse/KAFKA-14659 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Beard >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.5.0, 3.4.1 > > > Source tasks in Kafka connect offer two sets of metrics (documented in > [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]): > ||Metric||Description|| > |source-record-poll-rate|The average per-second number of records > produced/polled (before transformation) by this task belonging to the named > source connector in this worker.| > |source-record-write-rate|The average per-second number of records output > from the transformations and written to Kafka for this task belonging to the > named source connector in this worker. This is after transformations are > applied and excludes any records filtered out by the transformations.| > There are also corresponding "-total" metrics that capture the total number > of records polled and written for the metrics above, respectively. > In short, the "poll" metrics capture the number of messages sourced > pre-transformation/filtering, and the "write" metrics should capture the > number of messages ultimately written to Kafka post-transformation/filtering. > However, the implementation of the {{source-record-write-*}} metrics > _includes_ records filtered out by transformations (and also records that > result in produce failures with the config {{{}errors.tolerance=all{}}}). > h3. Details > In > [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397], > each source record is passed through the transformation chain where it is > potentially filtered out, checked to see if it was in fact filtered out, and > if so it is accounted for in the internal metrics via > {{{}counter.skipRecord(){}}}. > {code:java} > for (final SourceRecord preTransformRecord : toSend) { > retryWithToleranceOperator.sourceRecord(preTransformRecord); > final SourceRecord record = > transformationChain.apply(preTransformRecord); > final ProducerRecord producerRecord = > convertTransformedRecord(record); > if (producerRecord == null || retryWithToleranceOperator.failed()) { > > counter.skipRecord(); > recordDropped(preTransformRecord); > continue; > } > ... > {code} > {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows: > {code:java} > > public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup > metricsGroup) { > assert batchSize > 0; > assert metricsGroup != null; > this.batchSize = batchSize; > counter = batchSize; > this.metricsGroup = metricsGroup; > } > public void skipRecord() { > if (counter > 0 && --counter == 0) { > finishedAllWrites(); > } > } > > private void finishedAllWrites() { > if (!completed) { > metricsGroup.recordWrite(batchSize - counter); > completed = true; > } > } > {code} > For example: If a batch starts with 100 records, {{batchSize}} and > {{counter}} will both be initialized to 100. If all 100 records get filtered > out, {{counter}} will be decremented 100 times, and > {{{}finishedAllWrites(){}}}will record the value 100 to the underlying > {{source-record-write-*}} metrics rather than 0, the correct value according > to the documentation for these metrics. > h3. Solutions > Assuming the documentation correctly captures the intent of the > {{source-record-write-*}} metrics, it seems reasonable to fix these metrics > such that filtered records do not get counted. > It may also be useful to add additional metrics to capture the rate and total > number of records filtered out by transformations, which would require a KIP. > I'm not sure what the best way of accounting for produce failures in the case > of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own > new metrics? -- This message was sent by Atlassian Jira (v8.20.10#820010)