Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]
satishd commented on PR #15817: URL: https://github.com/apache/kafka/pull/15817#issuecomment-2095255793 @abhijeetk88 for reviewing the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +UnifiedLog unifiedLog; +if (useFutureLog) { +unifiedLog = replicaMgr.futureLogOrException(topicPartition); +} else { +unifiedLog = replicaMgr.localLogOrException(topicPartition); +} + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, unifiedLog.latestEpoch()); + +} + +private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { +int previousEpoch = epoch - 1; + +// Find the end-offset for the epoch earlier to the given epoch from the leader +Map partitionsWithEpochs = new HashMap<>(); +partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); +Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); +if (maybeEpochEndOffset.isEmpty()) { +throw new KafkaException("No response received for partition: " + partition); +} + +OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); +if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception(); +} + +return epochEndOffset; +} + +private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { +InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); +try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { +CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); +return readBuffer.read(); +} +} + +private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMetadata remoteLogSegmentMetadata, + RemoteLogManager rlm) throws IOException, RemoteStorageException { +// Restore producer snapshot +File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); +Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); +// Copy it to snapshot file in atomic mann
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +UnifiedLog unifiedLog; +if (useFutureLog) { +unifiedLog = replicaMgr.futureLogOrException(topicPartition); +} else { +unifiedLog = replicaMgr.localLogOrException(topicPartition); +} + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, unifiedLog.latestEpoch()); + +} + +private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { +int previousEpoch = epoch - 1; + +// Find the end-offset for the epoch earlier to the given epoch from the leader +Map partitionsWithEpochs = new HashMap<>(); +partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); +Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); +if (maybeEpochEndOffset.isEmpty()) { +throw new KafkaException("No response received for partition: " + partition); +} + +OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); +if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception(); +} + +return epochEndOffset; +} + +private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { +InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); +try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { +CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); +return readBuffer.read(); +} +} + +private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMetadata remoteLogSegmentMetadata, + RemoteLogManager rlm) throws IOException, RemoteStorageException { +// Restore producer snapshot +File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); +Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); +// Copy it to snapshot file in atomic mann
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590567527 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1858,16 +1858,6 @@ class KafkaConfigTest { } } - @Test - def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = { Review Comment: Can we add a positive test for mulitple log dirs also like the one created below for a single dir, may be refactor the below test to cover for both scenarios? ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +UnifiedLog unifiedLog; +if (useFutureLog) { +unifiedLog = replicaMgr.futureLogOrException(topicPartition); +} else { +unifiedLog = replicaMgr.localLogOrException(topicPartition); +} + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, unifiedLog.latestEpoch()); + +} + +private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { +int previousEpoch = epoch - 1; + +// Find the end-offset for the epoch earlier to the given epoch from the leader +Map partitionsWithEpochs = new HashMap<>(); +partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); +Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); +if (maybeEpochEndOffset.isEmpty()) { +throw new KafkaException("No response received for partition: " + partition); +} + +OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); +if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception(); +} + +return epochEndOffset; +} + +private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { +InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); +try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { +CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); +return readBuffer.read(); +} +} + +private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMe
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1590547523 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -1073,4 +1151,13 @@ class ZkMigrationIntegrationTest { kraftCluster.close() zkCluster.stop() } + + def maybeRetry(shouldRetry: Boolean, maxWaitMs: Long)(block: => Unit): Unit = { Review Comment: It seems we don't set `shouldRetry` to `false` in this test. Maybe this method is unnecessary? ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerZkVersionOpt: Option[Int] = if (!enableEntityConfigNoController) { Review Comment: As this flag is used to guard against 1) no controller and 2) kraft controller, the naming enableEntityConfig`NoController` is a bit unsuitable. How about `enableEntityConfigCheck`? ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest { admin.alterUserScramCredentials(alterations) } - def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = { -TestUtils.retry(1) { + def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit = { +maybeRetry(shouldRetry, 1) { val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test") assertEquals("204800", propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG)) assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG)) } } - def verifyClientQuotas(zkClient: KafkaZkClient): Unit = { -TestUtils.retry(1) { - assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate")) - assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "").getProperty("consumer_byte_rate")) - assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate")) - assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate")) - assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate")) + def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean): Unit = { +maybeRetry(shouldRetry, 1) { + val defaultBrokerProps = zkClient.getEntityConfigs(ConfigType.BROKER, "") + assertEquals("8640", defaultBrokerProps.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)) + + val broker0Props = zkClient.getEntityConfigs(ConfigType.BROKER, "0") + assertEquals("4320", broker0Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)) + + val broker1Props = zkClient.getEntityConfigs(ConfigType.BROKER, "1") + assertEquals("4320", broker1Props.getProperty(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)) } } + def verifyClientQuotas(zkClient: KafkaZkClient): Unit = { Review Comment: why removing the retry? It seems that make the `testDualWriteQuotaAndScram` unstable ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +474,48 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerZkVersionOpt: Option[Int] = if (!enableEntityConfigNoController) { + val controllerRegistration = getControllerRegistration match { +case Some(registration) => registration +case None => + // This case is mainly here to make tests less flaky (by virtue of retries). + // In practice, we always expect a /controller ZNode to exist + throw new ControllerMovedException(s"Cannot set entity configs when there is no controller.") + } + + // If there is a KRaft controller defined, don't even attempt this write. The broker will soon get a UMR + // from the new KRaft controller that lets it know about the new control
Re: [PR] KAFKA-15845: Detect leaked Kafka clients and servers with LeakTestingExtension [kafka]
github-actions[bot] commented on PR #14783: URL: https://github.com/apache/kafka/pull/14783#issuecomment-2095133166 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15524, KAFKA-15917: Wait for zombie sink tasks' consumers to commit offsets before trying to modify their offsets in integration tests [kafka]
github-actions[bot] commented on PR #15302: URL: https://github.com/apache/kafka/pull/15302#issuecomment-2095133015 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15722: KRaft support in RackAwareAutoTopicCreationTest [kafka]
github-actions[bot] commented on PR #15318: URL: https://github.com/apache/kafka/pull/15318#issuecomment-2095133001 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
[ https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16670: - Description: *Related Code* * Consumer get assignment Successfully : ** [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] * Consumer get be stuck Forever because of concurrent issue: ** [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79] *Unexpected behaviour* * Broker is sufficiently slow. * When a KafkaConsumer is created and immediately subscribes to a topic If both conditions are met, {{Consumer}} can potentially never receive {{TopicPartition}} assignments and become stuck indefinitely. In case of new broker and new consumer, when consumer are created, consumer background thread send a request to broker. (I guess groupCoordinator Heartbeat request). In that time, if broker does not load metadata from {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After broker load metadata completely, consumer background thread think 'this broker is valid group coordinator'. However, consumer can send {{subscribe}} request to broker before {{broker}} reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, consumer seems to be stuck. If both conditions are met, the {{Consumer}} can potentially never receive {{TopicPartition}} assignments and may become indefinitely stuck. In the case of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, {{consumer background thread}} start to send a request to the broker. (I believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this time, if the {{broker}} has not yet loaded metadata from {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once the broker has completely loaded the metadata, the {{consumer background thread}} recognizes this broker as a valid group coordinator. However, there is a possibility that the {{consumer}} can send a {{subscribe request}} to the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be stuck. You can check this scenario, in the {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}. If there is no sleep time to wait {{{}GroupCoordinator Heartbeat Request{}}}, {{consumer}} will be always stuck. If there is a little sleep time, {{consumer}} will always receive assignment. README : [https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md] In my case, consumer get assignment in `docker-compose` : it means not enough slow. However, consumer cannot get assignmet in `testcontainers` without little waiting time. : it means enough slow to cause concurrent issue. `testconatiners` is docker in docker, thus `testcontainers` will be slower than `docker-compose`. was: *Related Code* * Consumer get assignment Successfully : ** [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] * Consumer get be stuck Forever because of concurrent issue: ** https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79 *Unexpected behaviour* [|https://github.com/chickenchickenlove/new-consumer-error#unexpected-behaviour] * Broker is sufficiently slow. * When a KafkaConsumer is created and immediately subscribes to a topic If both conditions are met, {{Consumer}} can potentially never receive {{TopicPartition}} assignments and become stuck indefinitely. In case of new broker and new consumer, when consumer are created, consumer background thread send a request to broker. (I guess groupCoordinator Heartbeat request). In that time, if broker does not load metadata from {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After broker load metadata completely, consumer background thread think 'this broker is valid group coordinator'. However, consumer can send {{subscribe}} request to broker before {{broker}} reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, consumer seems to be stuck. If both conditions are met, the {{Consumer}} can potentially never receive {{TopicPartition}} assignments and may become indefinitely stuck. In the case of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, {{consumer background thread}} start to send a request to the broker. (I believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this time, if the {{broker}} has not yet loaded met
[jira] [Created] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.
sanghyeok An created KAFKA-16670: Summary: KIP-848 : Consumer will not receive assignment forever because of concurrent issue. Key: KAFKA-16670 URL: https://issues.apache.org/jira/browse/KAFKA-16670 Project: Kafka Issue Type: Bug Reporter: sanghyeok An *Related Code* * Consumer get assignment Successfully : ** [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57] * Consumer get be stuck Forever because of concurrent issue: ** https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79 *Unexpected behaviour* [|https://github.com/chickenchickenlove/new-consumer-error#unexpected-behaviour] * Broker is sufficiently slow. * When a KafkaConsumer is created and immediately subscribes to a topic If both conditions are met, {{Consumer}} can potentially never receive {{TopicPartition}} assignments and become stuck indefinitely. In case of new broker and new consumer, when consumer are created, consumer background thread send a request to broker. (I guess groupCoordinator Heartbeat request). In that time, if broker does not load metadata from {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After broker load metadata completely, consumer background thread think 'this broker is valid group coordinator'. However, consumer can send {{subscribe}} request to broker before {{broker}} reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, consumer seems to be stuck. If both conditions are met, the {{Consumer}} can potentially never receive {{TopicPartition}} assignments and may become indefinitely stuck. In the case of a new {{broker}} and new {{{}consumer{}}}, when the consumer is created, {{consumer background thread}} start to send a request to the broker. (I believe this is a {{{}GroupCoordinator Heartbeat request{}}}) During this time, if the {{broker}} has not yet loaded metadata from {{{}__consumer_offsets{}}}, it will begin to schedule metadata loading. Once the broker has completely loaded the metadata, the {{consumer background thread}} recognizes this broker as a valid group coordinator. However, there is a possibility that the {{consumer}} can send a {{subscribe request}} to the {{broker}} before the {{broker}} has replied to the {{{}GroupCoordinator Heartbeat Request{}}}. In such a scenario, the {{consumer}} appears to be stuck. You can check this scenario, in the {{{}src/test/java/com/example/MyTest#should_fail_because_consumer_try_to_poll_before_background_thread_get_valid_coordinator{}}}. If there is no sleep time to wait {{{}GroupCoordinator Heartbeat Request{}}}, {{consumer}} will be always stuck. If there is a little sleep time, {{consumer}} will always receive assignment. README : https://github.com/chickenchickenlove/new-consumer-error/blob/main/README.md -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843599#comment-17843599 ] PoAn Yang commented on KAFKA-16223: --- Yes, I will try to send a PR ASAP. Thanks. > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843596#comment-17843596 ] Chia-Ping Tsai commented on KAFKA-16223: [~yangpoan] Could you please take over remaining migration? > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]
chia7712 merged PR #15841: URL: https://github.com/apache/kafka/pull/15841 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16659) KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER
[ https://issues.apache.org/jira/browse/KAFKA-16659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16659. Fix Version/s: 3.8.0 Resolution: Fixed > KafkaConsumer#position() does not respect wakup when group protocol is > CONSUMER > --- > > Key: KAFKA-16659 > URL: https://issues.apache.org/jira/browse/KAFKA-16659 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > see following test > {code:scala} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > @Timeout(15) > def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit > = { > val topicPartition = new TopicPartition(topic, 15) > val consumer = createConsumer() > consumer.assign(List(topicPartition).asJava) > CompletableFuture.runAsync { () => > TimeUnit.SECONDS.sleep(1) > consumer.wakeup() > } > assertThrows(classOf[WakeupException], () => > consumer.position(topicPartition, Duration.ofSeconds(3))) > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]
chia7712 merged PR #15853: URL: https://github.com/apache/kafka/pull/15853 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16659: KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER [kafka]
chia7712 commented on PR #15853: URL: https://github.com/apache/kafka/pull/15853#issuecomment-2095025582 ``` ./gradlew cleanTest :connect:runtime:test --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow --tests MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeWithWildcardAcls --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests UserClientIdQuotaTest.testThrottledProducerConsumer --tests UserClientIdQuotaTest.testQuotaOverrideDelete --tests ReplicationQuotasTest.shouldThrottleOldSegments ``` unrelated error and they pass on my local. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16393 read/write sequence of buffers correctly [kafka]
chia7712 commented on PR #15571: URL: https://github.com/apache/kafka/pull/15571#issuecomment-2095022237 I have trigged QA again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
chia7712 commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590457683 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -44,4 +44,5 @@ String listener() default ""; MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; ClusterConfigProperty[] serverProperties() default {}; +Tags[] tags() default {}; Review Comment: Another idea. Maybe we don't need to have new annotation. We just have a new field `String[] labels() default {}` to enable developers to add custom strings to display name. That is more simple. WDYT? ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -44,4 +44,5 @@ String listener() default ""; MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; ClusterConfigProperty[] serverProperties() default {}; +Tags[] tags() default {}; Review Comment: Could you please add comment for it ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -143,23 +144,27 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType(); Map serverProperties = new HashMap<>(); +Map tags = new HashMap<>(); for (ClusterConfigProperty property : defaults.serverProperties()) { serverProperties.put(property.key(), property.value()); } for (ClusterConfigProperty property : annot.serverProperties()) { serverProperties.put(property.key(), property.value()); } +for (Tags tag: annot.tags()) { +tags.put(tag.key(), tag.value()); +} ClusterConfig config = ClusterConfig.builder() .setType(type) .setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers()) .setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers()) .setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker()) .setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : annot.autoStart() == AutoStart.YES) -.setName(annot.name().trim().isEmpty() ? null : annot.name()) .setListenerName(annot.listener().trim().isEmpty() ? null : annot.listener()) .setServerProperties(serverProperties) .setSecurityProtocol(annot.securityProtocol()) .setMetadataVersion(annot.metadataVersion()) +.setTags(tags) Review Comment: Maybe lambda is more simple. For example: `setTags(Arrays.stream(annot.tags()).collect(Collectors.toMap(Tags::key, Tags::value)))` ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -83,6 +81,7 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke this.saslServerProperties = Objects.requireNonNull(saslServerProperties); this.saslClientProperties = Objects.requireNonNull(saslClientProperties); this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties); +this.tags = Objects.requireNonNull(extendTags(tags)); Review Comment: We should keep the origin tags and return the "modified" tags in `nameTags`. Also, please add a method (`tags`) to return origin tags. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chia7712 commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1590455114 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig, cleanerManager.removeMetrics() } + /** + * Activate metrics + */ + def activateMetrics():Unit = { +metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, Review Comment: agree. @chiacyu Could you Please avoid producing duplicate code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]
chia7712 commented on code in PR #15862: URL: https://github.com/apache/kafka/pull/15862#discussion_r1590454147 ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -33,16 +31,22 @@ public class ClusterTestExtensionsUnitTest { void testProcessClusterTemplate() { ClusterTestExtensions ext = new ClusterTestExtensions(); ExtensionContext context = mock(ExtensionContext.class); -Consumer testInvocations = mock(Consumer.class); ClusterTemplate annot = mock(ClusterTemplate.class); -when(annot.value()).thenReturn("").thenReturn(" "); +when(annot.value()).thenReturn("").thenReturn(" ").thenReturn("test_empty_config"); Review Comment: That is a good way. Also, we can do the refactor in https://issues.apache.org/jira/browse/KAFKA-16654 later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]
soarez commented on code in PR #15862: URL: https://github.com/apache/kafka/pull/15862#discussion_r1590420247 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -119,23 +116,34 @@ public Stream provideTestTemplateInvocationContex return generatedContexts.stream(); } -void processClusterTemplate(ExtensionContext context, ClusterTemplate annot, - Consumer testInvocations) { +List processClusterTemplate(ExtensionContext context, ClusterTemplate annot) { // If specified, call cluster config generated method (must be static) List generatedClusterConfigs = new ArrayList<>(); +List testTemplateInvocationContexts = new ArrayList<>(); if (annot.value().trim().isEmpty()) { throw new IllegalStateException("ClusterTemplate value can't be empty string."); } generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add); -String baseDisplayName = context.getRequiredTestMethod().getName(); -generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testInvocations)); +if (context.getRequiredTestMethod() != null) { Review Comment: We shouldn't need to check for null here. Can you remove the condition? https://github.com/junit-team/junit5/blob/releases/5.10.x/junit-jupiter-api/src/main/java/org/junit/jupiter/api/extension/ExtensionContext.java#L236 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -119,23 +116,34 @@ public Stream provideTestTemplateInvocationContex return generatedContexts.stream(); } -void processClusterTemplate(ExtensionContext context, ClusterTemplate annot, - Consumer testInvocations) { +List processClusterTemplate(ExtensionContext context, ClusterTemplate annot) { // If specified, call cluster config generated method (must be static) List generatedClusterConfigs = new ArrayList<>(); +List testTemplateInvocationContexts = new ArrayList<>(); if (annot.value().trim().isEmpty()) { throw new IllegalStateException("ClusterTemplate value can't be empty string."); } generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add); -String baseDisplayName = context.getRequiredTestMethod().getName(); -generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testInvocations)); +if (context.getRequiredTestMethod() != null) { +String baseDisplayName = context.getRequiredTestMethod().getName(); +generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testTemplateInvocationContexts::add)); +} + +if (testTemplateInvocationContexts.isEmpty()) { +throw new IllegalStateException("ClusterConfig generator method should provide at least one config."); +} + +return testTemplateInvocationContexts; } private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) { Object testInstance = context.getTestInstance().orElse(null); -Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class); -ReflectionUtils.invokeMethod(method, testInstance, generator); +Class testClass = context.getRequiredTestClass(); +if (context.getRequiredTestClass() != null) { Review Comment: Same here. https://github.com/junit-team/junit5/blob/releases/5.10.x/junit-jupiter-api/src/main/java/org/junit/jupiter/api/extension/ExtensionContext.java#L137 Can you remove the `if`? ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -33,16 +31,22 @@ public class ClusterTestExtensionsUnitTest { void testProcessClusterTemplate() { ClusterTestExtensions ext = new ClusterTestExtensions(); ExtensionContext context = mock(ExtensionContext.class); -Consumer testInvocations = mock(Consumer.class); ClusterTemplate annot = mock(ClusterTemplate.class); -when(annot.value()).thenReturn("").thenReturn(" "); +when(annot.value()).thenReturn("").thenReturn(" ").thenReturn("test_empty_config"); Review Comment: I'm wondering if these tests would be simpler to write with actual methods. We could define a static inner class: ```java static class StubTest { @ClusterTemplate("cfgFoo") void testFoo() {} static void cfgFoo(ClusterGenerator gen) { /* ... */ } @ClusterTemplate("") void testBar() {} } ``` and a utility method: ```java
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
soarez commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1590415617 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig, cleaners += cleaner cleaner.start() } +activateMetrics(); Review Comment: Since there is no change to the existing definition of metrics, it seems this will cause the metrics to be initialized twice. ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig, cleanerManager.removeMetrics() } + /** + * Activate metrics + */ + def activateMetrics():Unit = { +metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, + () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100) + +metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => { + val stats = cleaners.map(_.lastStats) + val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / math.max(stats.iterator.map(_.bytesRead).sum, 1) + (100 * recopyRate).toInt +}) + +metricsGroup.newGauge(MaxCleanTimeMetricName, () => maxOverCleanerThreads(_.lastStats.elapsedSecs)) + +metricsGroup.newGauge(MaxCompactionDelayMetricsName, + () => maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000) + +metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount) Review Comment: If we're moving the metrics here, please keep the existing comments. ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig, cleaners += cleaner cleaner.start() } +activateMetrics(); Review Comment: +1 Please include a test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
mumrah commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1590396620 ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -467,13 +474,42 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { +val controllerZkVersion = if (!enableEntityConfigNoController) { Review Comment: We still need to support `kafka-config.sh --zookeeper` until 4.0. Previously, it was possible to set configs with the tools even if the cluster wasn't running. This flag lets us configure the whole KafkaZkClient in CommandConfig to allow this behavior. Otherwise, it's disabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
Phuc-Hong-Tran commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-2094904127 Fyi @JimmyWang6, I have taken over this ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1590370017 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,374 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterGenerator; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static kafka.test.annotation.Type.CO_KRAFT; +import static kafka.test.annotation.Type.KRAFT; +import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +@ExtendWith(value = ClusterTestExtensions.class) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -pu
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
JimmyWang6 commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-2094886153 > hi @JimmyWang6, are you still working on this? @Phuc-Hong-Tran I apologize for the delay in finishing this issue. I will continue to work on this recently, and sorry for any inconvenience it may have caused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1590353376 ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -530,6 +530,14 @@ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false)); } +@Test +public void shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() { Review Comment: Yes, will be making this test extensible for all the consumers. Can you help me with how can I capture the log to verify WARN is being printed? I think it's a good-to-have test case. Any test that already does this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2094870332 Down are the configs for each type with whether - 1. Is it a custom default for KS but is editable by the user 2. Or, is it a fixed value controlled by KS ### Producer Configs ```markdown EoS Disabled 1. [Editable] [CustomDefault] linger.ms = 100 4. [Fixed] partitioner.class = StreamsPartitioner EoS Enabled 1. [Editable] [CustomDefault] linger.ms = 100 2. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX 5. [Editable] [CustomDefault] transaction.timeout.ms = 1 6. [Fixed] partitioner.class = StreamsPartitioner 7. [Fixed] enable.idempotence = true 8. [Fixed] transactional.id = - 7. [Validate] max.in.flight.requests.per.connection <= 5 ``` ### Main Consumer Configs ```markdown EoS Disabled 1. [Editable][CustomDefault] auto.offset.reset = earliest 2. [Editable] [CustomDefault] max.poll.records = 1000 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = EoS Enabled 1. [Editable][CustomDefault] auto.offset.reset = earliest 2. [Editable] [CustomDefault] max.poll.records = 1000 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = ``` ### Global Consumer Configs ```markdown EoS Disabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None EoS Enabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None ``` ### Restore Consumer Configs ```markdown EoS Disabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None EoS Enabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None ``` There are a few more that are coded ad-hoc within the code that I haven't included. Seemed like a broader change for Streams Configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2094869760 Hi @mjsax, apologies for the extremely absent behavior on this PR. I have gone ahead and implemented the changes. The tests are pending and currently working on them. Detailing the implementation down. There are two pieces that KS controls - 1. Custom Default - Configs that have custom default values for KS compared to the actual defaults. These values can also be overwritten by the user. 2. Controlled Configs - Configs that are controlled by KS and cannot be overwritten by the user (We want to warn the user that this value is being overwritten if set by the user) ### Previous Implementation - 1. We used to have the following data structures ```java String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS // Controlled KS Consumer Configs String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIG // Controlled KS Consumer Configs when EoS enabled String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS // Controlled KS Producer Config when EoS enabled Map PRODUCER_DEFAULT_OVERRIDES // Producer Custom Default + Controlled Config Values Map PRODUCER_EOS_OVERRIDES // Producer Custom Default + Controlled Config Values with EoS Map CONSUMER_DEFAULT_OVERRIDES // Consumer Custom Default + Controlled Config Values Map CONSUMER_EOS_OVERRIDES // Consumer Custom Default + Controlled Config Values with EoS ``` 2. The steps to return the required config broadly were: 1. **Get client configs**: Gather client configurations with prefixes either `consumer.` or `producer.` and put them in `clientProvidedProps`. 2. **Clean `clientProvidedProps`**: Use the method `checkIfUnexpectedUserSpecifiedConsumerConfig` to tidy up `clientProvidedProps`. 3. **Create `props`**: Generate `props` using either `<>_DEFAULT_OVERRIDES` or `<>_EOS_OVERRIDES`. 4. **Overwrite `props`**: Replace `props` with the cleaned `clientProvidedProps`. 5. **Fetch additional configs (only for consumer props)**: If it's consumer props, fetch configurations set using `main.consumer.`, `global.consumer.`, or `restore.consumer.` and add them to the `props` map. 3. After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props. ### Current Implementation - 1. Give away with the old data structures and define the following new ones - ```java Map KS_DEFAULT_PRODUCER_CONFIGS // KS Custom Defaults for Producer Map KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED // KS Custom Defaults for Producer with EoS Map KS_CONTROLLED_PRODUCER_CONFIGS // KS Controlled Configs for Producer Map KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Producer with EoS Map KS_DEFAULT_CONSUMER_CONFIGS // KS Custom Defaults for Consumer Map KS_CONTROLLED_CONSUMER_CONFIGS // KS Controlled Configs for Consumer Map KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Consumer with EoS ``` 2. The steps to return the required config now are: 1. **Get client configs**: Obtain client configurations with prefixes either `consumer.` or `producer.` and place them in `clientProvidedProps`. 2. **Create `props`**: Generate `props` using either `KS_DEFAULT_<>_CONFIGS` or `KS_DEFAULT_<>_CONFIGS_EOS_ENABLED`. 3. **Overwrite `props`**: Replace `props` with the cleaned `clientProvidedProps`. 4. **Fetch additional configs (only for consumer props)**: If it's consumer props, fetch configurations set using `main.consumer.`, `global.consumer.`, or `restore.consumer.` and add them to the `props` map. 5. **Run validation check over `props`**: Perform a validation check on `props`. This check will use `KS_CONTROLLED_<>_CONFIGS` or `KS_CONTROLLED_<>_CONFIGS_EOS_ENABLED` maps to see if the values are already set in `props`. If they are, log a warning and overwrite them. If not, add them to `props`. 4. After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props. Below, I'll share the configurations organized into custom defaults and controlled configs for both consumers and producers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (2/3) [kafka]
hgeraldino commented on PR #15841: URL: https://github.com/apache/kafka/pull/15841#issuecomment-2094865173 Thanks @chia7712 for your comments. This PR migrates another 13 test cases to Mockito, I expect the remaining 12 cases to be covered in one last pull request -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16669) Remove extra collection copy when genrating DescribeAclsResource
[ https://issues.apache.org/jira/browse/KAFKA-16669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16669: -- Assignee: Chia Chuan Yu (was: Chia-Ping Tsai) > Remove extra collection copy when genrating DescribeAclsResource > > > Key: KAFKA-16669 > URL: https://issues.apache.org/jira/browse/KAFKA-16669 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Trivial > > There are three collection copy happening in generating DescribeAclsResource > 1. Iterable -> HashSet > (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/core/src/main/scala/kafka/server/AclApis.scala#L72) > 2. HashSet -> Map > (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L141) > 3. Map -> List > (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L146) > We can do two small optimization: > 1. remove the first collection copy. This optimization needs two steps: a) > change `aclsResources` input type from `Collection` to `Iterable`. b) > de-duplicate in second collection copy: HashSet -> Map. We use `Set` > to replace the `List` > 2. set the array size. > https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L148 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16669) Remove extra collection copy when genrating DescribeAclsResource
[ https://issues.apache.org/jira/browse/KAFKA-16669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843570#comment-17843570 ] Chia Chuan Yu commented on KAFKA-16669: --- Hi, [~chia7712] Can I have this one? Thanks! > Remove extra collection copy when genrating DescribeAclsResource > > > Key: KAFKA-16669 > URL: https://issues.apache.org/jira/browse/KAFKA-16669 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Trivial > > There are three collection copy happening in generating DescribeAclsResource > 1. Iterable -> HashSet > (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/core/src/main/scala/kafka/server/AclApis.scala#L72) > 2. HashSet -> Map > (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L141) > 3. Map -> List > (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L146) > We can do two small optimization: > 1. remove the first collection copy. This optimization needs two steps: a) > change `aclsResources` input type from `Collection` to `Iterable`. b) > de-duplicate in second collection copy: HashSet -> Map. We use `Set` > to replace the `List` > 2. set the array size. > https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L148 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes [kafka]
chia7712 commented on code in PR #15865: URL: https://github.com/apache/kafka/pull/15865#discussion_r1590318726 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java: ## @@ -160,7 +155,7 @@ public KafkaStatusBackingStore(Time time, Converter converter, Supplier kafkaLog) { -this(time, converter); +this(time, converter, null, "connect-distributed-"); Review Comment: This constructor is used by testing. It set `topicAdminSupplier` to null, so we have to handle the "null" `topicAdminSupplier` just for testing. That is a bit awkward to me. Could we require those test cases pass a `topicAdminSupplier` instead of `null`? Those tests can pass a fake `topicAdminSupplier` to constructor if they expect `topicAdminSupplier` should not be called in testing. https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java#L137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14785: Connect offset read REST API [kafka]
yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1590316538 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ## @@ -141,24 +145,13 @@ private static String noClientId() { protected KafkaBasedLog offsetLog; // Visible for testing final HashMap data = new HashMap<>(); +private final Map>> connectorPartitions = new HashMap<>(); +private Converter keyConverter; private final Supplier topicAdminSupplier; private final Supplier clientIdBase; private SharedTopicAdmin ownTopicAdmin; protected boolean exactlyOnce; -/** - * Create an {@link OffsetBackingStore} backed by a Kafka topic. This constructor will cause the - * store to instantiate and close its own {@link TopicAdmin} during {@link #configure(WorkerConfig)} - * and {@link #stop()}, respectively. - * - * @deprecated use {@link #KafkaOffsetBackingStore(Supplier, Supplier)} instead - */ -@Deprecated -public KafkaOffsetBackingStore() { Review Comment: @chia7712 I've raised this PR to remove the other two deprecated constructors - https://github.com/apache/kafka/pull/15865 (and also added some context in the PR on why deprecation was introduced rather than removal even though these classes are not in the public API). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16669) Remove extra collection copy when genrating DescribeAclsResource
Chia-Ping Tsai created KAFKA-16669: -- Summary: Remove extra collection copy when genrating DescribeAclsResource Key: KAFKA-16669 URL: https://issues.apache.org/jira/browse/KAFKA-16669 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai There are three collection copy happening in generating DescribeAclsResource 1. Iterable -> HashSet (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/core/src/main/scala/kafka/server/AclApis.scala#L72) 2. HashSet -> Map (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L141) 3. Map -> List (https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L146) We can do two small optimization: 1. remove the first collection copy. This optimization needs two steps: a) change `aclsResources` input type from `Collection` to `Iterable`. b) de-duplicate in second collection copy: HashSet -> Map. We use `Set` to replace the `List` 2. set the array size. https://github.com/apache/kafka/blob/25118cec145b1a70a7b1709ca4a7ac367f066c6c/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java#L148 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove deprecated constructors from Connect's Kafka*BackingStore classes [kafka]
yashmayya commented on PR #15865: URL: https://github.com/apache/kafka/pull/15865#issuecomment-2094813303 Context - https://github.com/apache/kafka/pull/13434#discussion_r1590212654 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
gaurav-narula commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1590315633 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig, cleaners += cleaner cleaner.start() } +activateMetrics(); Review Comment: Would be useful to add the test in the JIRA for posterity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
gaurav-narula commented on code in PR #15863: URL: https://github.com/apache/kafka/pull/15863#discussion_r1590313031 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig, cleanerManager.removeMetrics() Review Comment: I reckon the metrics in `LogCleanerManager` would remained removed? Perhaps they shouldn't be removed while reconfiguring? ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -159,6 +159,7 @@ class LogCleaner(initialConfig: CleanerConfig, cleaners += cleaner cleaner.start() } +activateMetrics(); Review Comment: Would be useful to add the test in the original JIRA for posterity. ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -182,6 +183,27 @@ class LogCleaner(initialConfig: CleanerConfig, cleanerManager.removeMetrics() } + /** + * Activate metrics + */ + def activateMetrics():Unit = { +metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName, Review Comment: Perhaps we can remove the declarations in the class field above around line 130? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]
gaurav-narula commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1590274476 ## core/src/main/scala/kafka/server/FetchSession.scala: ## @@ -787,9 +803,37 @@ class FetchSessionCache(private val maxEntries: Int, } } } +object FetchSessionCache { + private[server] val metricsGroup = new KafkaMetricsGroup(classOf[FetchSessionCache]) + private val counter = new AtomicLong(0) +} + +class FetchSessionCache(private val cacheShards: Seq[FetchSessionCacheShard]) { + // Set up metrics. + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => cacheShards.map(_.size).sum) + FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum) + + def getCacheShard(sessionId: Int): FetchSessionCacheShard = { +val shard = sessionId / cacheShards.head.sessionIdRange +cacheShards(shard) + } + + // Returns the shard in round-robin + def getNextCacheShard: FetchSessionCacheShard = { +val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt Review Comment: I used `AtomicLong` to practically rule out an overflow but found `Utils.toPositive` which is used by `RoundRobinPartitioner` :) Updated to use an `AtomicInteger` and also added some test to ensure round-robin allocations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590273427 ## core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java: ## @@ -42,4 +42,5 @@ boolean autoStart() default true; // Set default server properties for all @ClusterTest(s) ClusterConfigProperty[] serverProperties() default {}; +ClusterConfigDisplayTag[] displayTags() default {}; Review Comment: do you mean put the tags in the ClusterTestDefaults? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on PR #15861: URL: https://github.com/apache/kafka/pull/15861#issuecomment-2094744565 updated and `./gradlew clean core:test --tests ClusterTestExtensionsTest --tests ClusterConfigTest` passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Wall-Clock based Windowing / Suppression [kafka]
eichingertim opened a new pull request, #15864: URL: https://github.com/apache/kafka/pull/15864 The change adds the possibility to suppress events during windowing based on WALL_CLOCK_TIME instead of STREAM_CLOCK_TIME. **Problem Statement**: In our use case we needed this functionality, as the amount of events varies very much. Sometimes we get so few events, that the event won't be emitted at all unless a new event comes in to advance STREAM_CLOCK_TIME. **Solution**: We implemented a new possibility to use WALL_CLOCK_TIME instead of STREAM_CLOCK_TIME for suppression during windowing. It is based on a scheduled operation and punctuates the clock on each interval. **Testing**: For testing we extended the `streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java` with a new scenario where the suppression is based on wall clock time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
chia7712 commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590270528 ## core/src/test/java/kafka/test/annotation/ClusterConfigDisplayTag.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Target({ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ClusterConfigDisplayTag { Review Comment: see https://github.com/apache/kafka/pull/15861#discussion_r1590270405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
chia7712 commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590270405 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -152,6 +154,9 @@ public MetadataVersion metadataVersion() { public Map> perBrokerOverrideProperties() { return perBrokerOverrideProperties; } +public Map displayTags() { Review Comment: > If they are the same concept, let me rename it and combine them together It seems to me that one "tags" method is enough. That method returns the "modified" tags ( raw + others) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590267358 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -152,6 +154,9 @@ public MetadataVersion metadataVersion() { public Map> perBrokerOverrideProperties() { return perBrokerOverrideProperties; } +public Map displayTags() { Review Comment: I thought that `nameTags` was for other purposes, thus separated them and use different name for the tags that I am adding. If they are the same concept, let me rename it and combine them together -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590267137 ## core/src/test/java/kafka/test/annotation/ClusterConfigDisplayTag.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Target({ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ClusterConfigDisplayTag { Review Comment: there is another `nameTags` already exists, I was not sure if that means something else. In order not to mess them together, I use DisplayName. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16668: improve cluster test [kafka]
johnnychhsu commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590266910 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -44,4 +44,5 @@ String listener() default ""; MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; ClusterConfigProperty[] serverProperties() default {}; +ClusterConfigDisplayTag[] displayTags() default {}; Review Comment: i was wondering if it is being used by anyone now, but just checked and found that this is only used in test. Let me remove that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Kafka-16668: improve cluster test [kafka]
chia7712 commented on code in PR #15861: URL: https://github.com/apache/kafka/pull/15861#discussion_r1590262208 ## core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java: ## @@ -42,4 +42,5 @@ boolean autoStart() default true; // Set default server properties for all @ClusterTest(s) ClusterConfigProperty[] serverProperties() default {}; +ClusterConfigDisplayTag[] displayTags() default {}; Review Comment: Could we add this to default annotation if we do have such use case? ## core/src/test/java/kafka/test/annotation/ClusterConfigDisplayTag.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Target({ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ClusterConfigDisplayTag { Review Comment: Maybe call it `Tag`? `ClusterConfigDisplayTag` is a bit verbose to me ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -152,6 +154,9 @@ public MetadataVersion metadataVersion() { public Map> perBrokerOverrideProperties() { return perBrokerOverrideProperties; } +public Map displayTags() { Review Comment: Please update `nameTags` ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -44,4 +44,5 @@ String listener() default ""; MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; ClusterConfigProperty[] serverProperties() default {}; +ClusterConfigDisplayTag[] displayTags() default {}; Review Comment: We can remove the `name` field now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]
TaiJuWu opened a new pull request, #15862: URL: https://github.com/apache/kafka/pull/15862 *More detailed description of your change, refactor processClusterTemplate and add unit test for empty config *Summary of testing strategy (including rationale) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] Kafka-16668: improve cluster test [kafka]
johnnychhsu opened a new pull request, #15861: URL: https://github.com/apache/kafka/pull/15861 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org