Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1556834149 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SchemaUtil; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently, + * instead of multiple individual {@link SingleFieldPath single-field paths}. + * + * If the SMT requires accessing a single field on the same data object, + * use {@link SingleFieldPath} instead. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see SingleFieldPath + * @see FieldSyntaxVersion + */ +public class MultiFieldPaths { +final Trie trie = new Trie(); + +MultiFieldPaths(Set paths) { +paths.forEach(trie::insert); +} + +public static MultiFieldPaths of(List fields, FieldSyntaxVersion syntaxVersion) { +return new MultiFieldPaths(fields.stream() +.map(f -> new SingleFieldPath(f, syntaxVersion)) +.collect(Collectors.toSet())); +} + +/** + * Find values at the field paths + * + * @param struct data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Struct struct) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(struct, trie.root, new HashMap<>()); +} + +private Map> findFieldAndValues( +Struct originalValue, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Field field = originalValue.schema().field(step.getKey()); +if (step.getValue().isLeaf()) { +Map.Entry fieldAndValue = +field != null +? new AbstractMap.SimpleImmutableEntry<>(field, originalValue.get(field)) +: null; +fieldAndValueMap.put(step.getValue().path, fieldAndValue); +} else { +if (field.schema().type() == Type.STRUCT) { +findFieldAndValues( +originalValue.getStruct(field.name()), +step.getValue(), +fieldAndValueMap +); +} +} +} +return fieldAndValueMap; +} + +/** + * Find values at the field paths + * + * @param value data value + * @return map of field paths and field/values + */ +public Map> fieldAndValuesFrom(Map value) { +if (trie.isEmpty()) return Collections.emptyMap(); +return findFieldAndValues(value, trie.root, new HashMap<>()); +} + +@SuppressWarnings("unchecked") +private Map> findFieldAndValues( +Map value, +TrieNode trieAt, +Map> fieldAndValueMap +) { +for (Map.Entry step : trieAt.steps().entrySet()) { +Object fieldValue = value.get(step.getKey()); +if (step.getValue().isLeaf()) { +fieldAndValueMap.put( +step.getValue().path, +new AbstractMap.SimpleImmutableEntry<>(step.getKey(), fieldValue) +); +} else { +if (fieldValue instanceof Map) { +findFieldAndValues( +(Map) fieldValue, +step.getValue(), +fieldAndValueMap +
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on PR #15379: URL: https://github.com/apache/kafka/pull/15379#issuecomment-2044091861 I'm also wondering if it's necessary to have the `SingleFieldPath` class at all. Would it be significantly more expensive to just use the `MultiFieldPaths` class for everything for now? It'd reduce the complexity of the implementation and probably make review easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on PR #15379: URL: https://github.com/apache/kafka/pull/15379#issuecomment-2044089149 @jeqo This is difficult to review without seeing how this code is actually used. Maybe we could move incrementally and introduce a commit that only touches on 1-3 SMTs, and only introduces the internal changes (i.e., methods in the `SingleFieldPath` and `MultiFieldPaths` classes) necessary in order to touch on those SMTs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> { +return null; +}); + +long now = time.milliseconds(); +// We keep the heartbeat thread running behind the scenes and poll frequently so that eventually +// the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. +TestUtils.waitForCondition(() -> { +time.sleep(heartbeatIntervalMs - 1); +return time.milliseconds() > now + rebalanceTimeoutMs; +}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for rebalance.timeout.ms"); +coordinator.poll(0, () -> { Review Comment: 1. You didn't provide HeartBeatResponse, so it'll have session timeout. 2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat should wait, the real timeout for heartBeat should be sessionTimeout, so we can set `sessionTimeoutMs - 1` to make the time faster to reach `rebalanceTimeoutMs`. 3. The last poll doesn't make any sense because the poll timeout should be triggered already. Why do we need it? What I would write is something like this, FYR: ``` public void testPollTimeoutExpiry() throws InterruptedException { when(configStorage.snapshot()).thenReturn(configState1); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE)); // prepare 3 heartBeatResponses because we will trigger 3 heartBeat requests until rebalanceTimeout, // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { coordinator.ensureActiveGroup(); coordinator.poll(0, () -> { return null; }); // We keep the heartbeat thread running behind the scenes and poll frequently so that eventually // the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. TestUtils.waitForCondition(() -> { // sleep until sessionTimeoutMs to trigger a heartBeat request to avoid session timeout. // Not sure if this will be flaky in CI because the heartbeat thread might not send out the heartBeat request in time. time.sleep(sessionTimeoutMs - 1); return
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> { +return null; +}); + +long now = time.milliseconds(); +// We keep the heartbeat thread running behind the scenes and poll frequently so that eventually +// the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. +TestUtils.waitForCondition(() -> { +time.sleep(heartbeatIntervalMs - 1); +return time.milliseconds() > now + rebalanceTimeoutMs; +}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for rebalance.timeout.ms"); +coordinator.poll(0, () -> { Review Comment: 1. You didn't provide HeartBeatResponse, so it'll have session timeout. 2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat should send, but the real timeout for heartBeat should be sessionTimeout, so we can set `sessionTimeoutMs - 1` to make the time faster to reach `rebalanceTimeoutMs`. 3. The last poll doesn't make any sense because the poll timeout should be triggered already. Why do we need it? What I would write is something like this, FYR: ``` public void testPollTimeoutExpiry() throws InterruptedException { when(configStorage.snapshot()).thenReturn(configState1); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE)); // prepare 3 heartBeatResponses because we will trigger 3 heartBeat requests until rebalanceTimeout, // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { coordinator.ensureActiveGroup(); coordinator.poll(0, () -> { return null; }); // We keep the heartbeat thread running behind the scenes and poll frequently so that eventually // the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. TestUtils.waitForCondition(() -> { // sleep until sessionTimeoutMs to trigger a heartBeat request to avoid session timeout. // Not sure if this will be flaky in CI because the heartbeat thread might not send out the heartBeat request in time. time.sleep(sessionTimeoutMs - 1); return
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> { +return null; +}); + +long now = time.milliseconds(); +// We keep the heartbeat thread running behind the scenes and poll frequently so that eventually +// the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. +TestUtils.waitForCondition(() -> { +time.sleep(heartbeatIntervalMs - 1); +return time.milliseconds() > now + rebalanceTimeoutMs; +}, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for rebalance.timeout.ms"); +coordinator.poll(0, () -> { Review Comment: 1. You didn't provide HeartBeatResponse, so it'll have session timeout. 2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat should send, but the real timeout for heartBeat should be sessionTimeout, so we can set `sessionTimeoutMs - 1` to make the time faster to reach `rebalanceTimeoutMs`. 3. The last poll doesn't make any sense because the poll timeout should be triggered already. Why do we need it? What I would write is something like this, FYR: ``` public void testPollTimeoutExpiry() throws InterruptedException { when(configStorage.snapshot()).thenReturn(configState1); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE)); // prepare 3 heartBeatResponses because we will trigger 3 heartBeat requests until rebalanceTimeout, // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { coordinator.ensureActiveGroup(); System.out.println("!!! poll"); coordinator.poll(0, () -> { return null; }); // We keep the heartbeat thread running behind the scenes and poll frequently so that eventually // the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. TestUtils.waitForCondition(() -> { // sleep until sessionTimeoutMs to trigger a heartBeat request to avoid session timeout. // Not sure if this will be flaky in CI because the heartbeat thread might not send out the heartBeat request in time.
[jira] [Resolved] (KAFKA-16455) Check partition exists before send reassignments to server in ReassignPartitionsCommand
[ https://issues.apache.org/jira/browse/KAFKA-16455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16455. --- Fix Version/s: 3.8.0 Resolution: Fixed > Check partition exists before send reassignments to server in > ReassignPartitionsCommand > --- > > Key: KAFKA-16455 > URL: https://issues.apache.org/jira/browse/KAFKA-16455 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Minor > Fix For: 3.8.0 > > > Currently, when executing {{kafka-reassign-partitions.sh}} with the > {{--execute}} option, if a partition number specified in the JSON file does > not exist, this check occurs only when submitting the reassignments to > {{alterPartitionReassignments}} on the server-side. > We can perform this check in advance before submitting the reassignments to > the server side. > For example, suppose we have three brokers with IDs 1001, 1002, and 1003, and > a topic named {{first_topic}} with only three partitions. And execute > {code:bash} > bin/kafka-reassign-partitions.sh > --bootstrap-server 192.168.0.128:9092 > --reassignment-json-file reassignment.json > --execute > {code} > Where reassignment.json contains > {code:json} > { > "version": 1, > "partitions": [ > { > "topic": "first_topic", > "partition": 20, > "replicas": [1002, 1001, 1003], > "log_dirs": ["any", "any", "any"] > } > ] > } > {code} > The console outputs > {code:java} > Current partition replica assignment > {"version":1,"partitions":[]} > Save this to use as the --reassignment-json-file option during rollback > Error reassigning partition(s): > first_topic-20: The partition does not exist. > {code} > Apart from the output {{\{"version":1,"partitions":[]\}}} which doesn't > provide much help, the error {{first_topic-20: The partition does not > exist.}} is reported back to the tool from the server-side, as mentioned > earlier. This check could be moved earlier before sending reassignments to > server side -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
showuon merged PR #15659: URL: https://github.com/apache/kafka/pull/15659 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
showuon commented on PR #15659: URL: https://github.com/apache/kafka/pull/15659#issuecomment-2043887866 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556614475 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -109,7 +109,7 @@ class AddPartitionsToTxnManager( .setTransactionalId(transactionalId) .setProducerId(producerId) .setProducerEpoch(producerEpoch) -.setVerifyOnly(true) +.setVerifyOnly(supportedOperation != addPartition) Review Comment: nit -- we should also add the supportedOperation in KafkaApis and/or the other files based on the request version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1556593959 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Hmm, when will we set HWM to be lower than _localLogStartOffset? In `UnifiedLog.deletableSegments()`, we have the following code that bounds the retention based deletion by highWatermark. When updating highWatermark, the value typically increases. `val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556609642 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -979,6 +1013,13 @@ void handleCoordinatorReady() { null; this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null && initProducerIdVersion.maxVersion() >= 3; + +// TODO(caliu) use feature version. +ApiVersion produceVersion = nodeApiVersions != null ? +nodeApiVersions.apiVersion(ApiKeys.PRODUCE) : +null; +this.coordinatorSupportsTransactionV2 = produceVersion != null && Review Comment: We should also be checking the TV on the various requests and making sure we check the epoch when we update the cluster's latest TV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) +} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested) { + // If the client requests to use transaction V2 but server side does not supports it, return unsupported version. Review Comment: This was not in the design. If we send a request version the server can handle, we can handle it. Ie --> if ApiVersions advertises TV 1, then the server has the code to handle V1, and we send the new request version. There are two cases where `!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested` is true. 1) TV is downgraded only (no image changes) -- in this case, we can still handle the old request and we should do so. 2) TV is downgraded + image version is downgraded. In this case, when the server receives v5 request, it will return unsupported version automatically since it doesn't recognize this version. The reason we do this is there is no way to guarantee that downgrades happen immediately due to the way ApiVersions requests propagate. (The only way to ensure it happens is to restart a broker) Thus, we took this strategy: > The downgrade case is a bit different. When we downgrade TV, it is possible to not receive an update communicating this from any broker for a long time. We could even start rolling an incompatible image to the cluster. Once we do this roll however, the brokers will reconnect and update the TV with the newest epoch. As we are checking the TV on every request, we can abort the transaction and restart with the new epoch of TV and the old protocol. However, in the edge case where we somehow send a request to an older image broker, we know that the new protocol is gated by the Produce/TxnOffsetCommit and AddPartitionsToTxn versions. If we encounter a broker that is unable to handle the protocol, it is also unable to handle the request version. In this case, we will return UnsupportedVersionException which is fatal to the client. In most cases, we shouldn’t hit this scenario. We also chose this approach as to not cause flip-flopping during the upgrade case. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) +} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested) { + // If the client requests to use transaction V2 but server side does not supports it, return unsupported version. Review Comment: This was not in the design. If we send a request version the server can handle, we can handle it. Ie --> if ApiVersions advertises TV 1, then the server has the code to handle V1, and we send the new request version. There are two cases where `!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested` is true. 1) TV is downgraded only (no image changes) -- in this case, we can still handle the old request and we should do so. 2) TV is downgraded + image version is downgraded. In this case, when the server receives v5 request, it will return unsupported version automatically since it doesn't recognize this version. The reason we do this is there is no way to guarantee that downgrades happen immediately due to the way ApiVersions requests propagate. (The only way to ensure it happens immediately is to restart a broker) Thus, we took this strategy: > The downgrade case is a bit different. When we downgrade TV, it is possible to not receive an update communicating this from any broker for a long time. We could even start rolling an incompatible image to the cluster. Once we do this roll however, the brokers will reconnect and update the TV with the newest epoch. As we are checking the TV on every request, we can abort the transaction and restart with the new epoch of TV and the old protocol. However, in the edge case where we somehow send a request to an older image broker, we know that the new protocol is gated by the Produce/TxnOffsetCommit and AddPartitionsToTxn
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) +} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested) { + // If the client requests to use transaction V2 but server side does not supports it, return unsupported version. Review Comment: This was not in the design. If we send a request version the server can handle, we can handle it. Ie --> if ApiVersions advertises TV 1, then the server has the code to handle V1, and we send the new request version. There are two cases where `!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested` is true. 1) TV is downgraded -- in this case, we can still handle the old request and we should do so. 2) TV is downgraded + image version is downgraded. In this case, when the server receives v5 request, it will return unsupported version automatically since it doesn't recognize this version. The reason we do this is there is no way to guarantee that downgrades happen immediately due to the way ApiVersions requests propagate. (The only way to ensure it happens is to restart a broker) Thus, we took this strategy: > The downgrade case is a bit different. When we downgrade TV, it is possible to not receive an update communicating this from any broker for a long time. We could even start rolling an incompatible image to the cluster. Once we do this roll however, the brokers will reconnect and update the TV with the newest epoch. As we are checking the TV on every request, we can abort the transaction and restart with the new epoch of TV and the old protocol. However, in the edge case where we somehow send a request to an older image broker, we know that the new protocol is gated by the Produce/TxnOffsetCommit and AddPartitionsToTxn versions. If we encounter a broker that is unable to handle the protocol, it is also unable to handle the request version. In this case, we will return UnsupportedVersionException which is fatal to the client. In most cases, we shouldn’t hit this scenario. We also chose this approach as to not cause flip-flopping during the upgrade case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) +} else if (!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested) { + // If the client requests to use transaction V2 but server side does not supports it, return unsupported version. Review Comment: This was not in the design. If we send a request version the server can handle, we can handle it. Ie --> if ApiVersions advertises TV 1, then the server has the code to handle V1, and we send the new request version. There are two cases where `!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested` is false. 1) TV is downgraded -- in this case, we can still handle the old request and we should do so. 2) TV is downgraded + image version is downgraded. In this case, when the server receives v5 request, it will return unsupported version automatically since it doesn't recognize this version. The reason we do this is there is no way to guarantee that downgrades happen immediately due to the way ApiVersions requests propagate. (The only way to ensure it happens is to restart a broker) Thus, we took this strategy: > The downgrade case is a bit different. When we downgrade TV, it is possible to not receive an update communicating this from any broker for a long time. We could even start rolling an incompatible image to the cluster. Once we do this roll however, the brokers will reconnect and update the TV with the newest epoch. As we are checking the TV on every request, we can abort the transaction and restart with the new epoch of TV and the old protocol. However, in the edge case where we somehow send a request to an older image broker, we know that the new protocol is gated by the Produce/TxnOffsetCommit and AddPartitionsToTxn versions. If we encounter a broker that is unable to handle the protocol, it is also unable to handle the request version. In this case, we will return UnsupportedVersionException which is fatal to the client. In most cases, we shouldn’t hit this scenario. We also chose this approach as to not cause flip-flopping during the upgrade case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]
pasharik commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2043855618 > any updates here? gradle 8.7 has been released. we can update gradle to 8.7 if the issue is nonexistent Update to 8.7 is ok with me. For now, I'm compiling my test with `scalac` as a workaround anyway -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556584820 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -979,6 +1013,13 @@ void handleCoordinatorReady() { null; this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null && initProducerIdVersion.maxVersion() >= 3; + +// TODO(caliu) use feature version. +ApiVersion produceVersion = nodeApiVersions != null ? +nodeApiVersions.apiVersion(ApiKeys.PRODUCE) : +null; +this.coordinatorSupportsTransactionV2 = produceVersion != null && Review Comment: I think the other thing we are looking for is in the txn offset commit request and the produce request, setting the version to 11 and 4 until v2/feature version is enabled. I didn't see that here yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556576980 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -402,6 +412,30 @@ public synchronized void maybeAddPartition(TopicPartition topicPartition) { } } +public synchronized void maybeHandlePartitionAdded(TopicPartition topicPartition) { Review Comment: For my understanding, this was the previous handling for the add partition call, but since add partition is now part of the produce request, we have a separate block? I wonder if we should change some of these error messages to reflect that the record was not written either? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [No Review] Kafka-14563 [kafka]
jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556556192 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -202,7 +202,9 @@ public enum MetadataVersion { IBP_3_7_IV4(19, "3.7", "IV4", false), // Add ELR related supports (KIP-966). -IBP_3_8_IV0(20, "3.8", "IV0", true); +IBP_3_8_IV0(20, "3.8", "IV0", true), + +IBP_100_1_IV0(100, "100.1", "IV0", false); Review Comment: hehe this is a unique way to have a placeholder for feature version :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan opened a new pull request, #15685: URL: https://github.com/apache/kafka/pull/15685 As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc. I've also added the `--feature` flag to the storage tool to show how it will be used. Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043792883 @chia7712 : Thanks for the updated PR. The code looks good to me. There were 50 failed tests. Is any of them related to the PR? If not, have they all been tracked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]
junrao commented on PR #15680: URL: https://github.com/apache/kafka/pull/15680#issuecomment-2043781090 @apoorvmittal10 : Thanks for the PR. The code looks good to me. Have all the test failures been tracked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556505842 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: this is a good point. I rephrase it in a way that doesn't relay on referring to `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556496340 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: > Why do we need this? we usually don't add it if the config class just to group the props, docs and default values. that is used to avoid creating instance, and that is a guideline of `Effective Java book` anyway, that is just a code style suggestion, so it is fine to ignore it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]
apoorvmittal10 commented on PR #15680: URL: https://github.com/apache/kafka/pull/15680#issuecomment-2043749614 > @apoorvmittal10 Haven’t we already released those metrics in 3.7? @dajac There were some work items left for completion for 3.8. These broker metrics were not part of 3.7 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556491908 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: > If we want to remove the deps on TopicConfig we can hard code the value of TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG in doc but I personally would prefer that we keep the ref to the config. not sure whether we ought to highlight the "override". the other similar configs, for example `offsets.topic.replication.factor`, do not mention that "this" config overrides "that". Instead, it just says `The replication factor for the offsets topic ...`, and that is good enough to understand the purpose of config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1556491449 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -88,10 +125,10 @@ public OffsetConfig(int maxMetadataSize, } public OffsetConfig() { Review Comment: deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1556490595 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,53 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { -public static final int DEFAULT_MAX_METADATA_SIZE = 4096; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; -public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 1000L; -public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 60L; -public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50; -public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3; -public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE = CompressionType.NONE; -public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000; -public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1; +public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; +public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; +public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; + +public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size"; +public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; +public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large)."; + +public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes"; +public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60; +public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + +"2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + +"For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + +"Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + +"also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; + +public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms"; +public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 60L; +public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets"; + +public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions"; +public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of partitions for the offset commit topic (should not change after deployment)."; + +public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes"; +public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads."; + +public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; +public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = "offsets.topic.compression.codec"; +public static final CompressionType OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE; +public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits."; + +public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offsets.commit.timeout.ms"; +public static
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556482611 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; + +public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG = "transaction.state.log.load.buffer.size"; +public static final int TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; +public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large)."; + +public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG = ProducerStateManagerConfig.TRANSACTION_VERIFICATION_ENABLED; +public static final boolean TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT = true; +public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition"; + +public static final String PRODUCER_ID_EXPIRATION_MS_CONFIG = ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS; Review Comment: moved TRANSACTION_VERIFICATION_ENABLED and PRODUCER_ID_EXPIRATION_MS to TransactionLogConfig where we have the docs and default values -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556478763 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: Adding "s" to the name make sense but seems like we don't have one pattern in Kafka for example there are SaslConfigs and TopicConfig. I'll rename it but we properly need to update https://issues.apache.org/jira/browse/KAFKA-14524 to add some guidance for future refactors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556481647 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: > add private constructor Why do we need this? we usually don't add it if the config class just to group the props, docs and default values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556478763 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: Adding "s" to the name make sense but seems like we don't have one pattern in Kafka for example there is SaslConfigs and TopicConfig. I'll rename it but we properly need to update https://issues.apache.org/jira/browse/KAFKA-14524 to add some guidance for future refactors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2043718730 @clolov : Thanks for the PR. When we last bumped up metadata.version (https://github.com/apache/kafka/pull/14984), we changed a bunch of tests such as MetadataVersionTest, ZkMigrationIntegrationTest, etc to reference 3.8-IV0. Have we added 3.8-IV1 to all needed tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556466418 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: We maybe don't need `ServerTopicConfigSynonyms::serverSynonym` as it actually call `ServerTopicConfigSynonyms.sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)` which is same as if we just referred to `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` directly. If we want to remove the deps on `TopicConfig` we can hard code the value of `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` in doc but I personally would prefer that we keep the ref to the config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
chia7712 commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1556449383 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -17,13 +17,84 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.common.utils.Utils; Review Comment: It seems we don't have strict import orders, but it would be nice to optimize the imports :) ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ## @@ -20,16 +20,53 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { -public static final int DEFAULT_MAX_METADATA_SIZE = 4096; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; -public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 1000L; -public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 60L; -public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50; -public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3; -public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE = CompressionType.NONE; -public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000; -public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1; +public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; +public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; +public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; + +public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size"; +public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; +public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large)."; + +public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes"; +public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60; +public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + +"2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + +"For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + +"Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + +"also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; + +public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms"; +public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 60L; +public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets"; + +public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions"; +public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of partitions for the offset commit topic (should not change after deployment)."; + +public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes"; +public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads."; + +public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; +public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG =
Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]
chia7712 commented on PR #15553: URL: https://github.com/apache/kafka/pull/15553#issuecomment-2043666756 any updates here? gradle 8.7 has been released. we can update gradle to 8.7 if the issue is nonexistent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16477) Detect thread leaked client-metrics-reaper in tests
[ https://issues.apache.org/jira/browse/KAFKA-16477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16477. Fix Version/s: 3.8.0 Resolution: Fixed > Detect thread leaked client-metrics-reaper in tests > --- > > Key: KAFKA-16477 > URL: https://issues.apache.org/jira/browse/KAFKA-16477 > Project: Kafka > Issue Type: Improvement >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.8.0 > > > After profiling the kafka tests, tons of `client-metrics-reaper` thread not > cleanup after BrokerServer shutdown. > The thread {{client-metrics-reaper}} comes from > [ClientMetricsManager#expirationTimer|https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115], > and BrokerServer#shudown doesn't close ClientMetricsManager which let the > timer thread still runs in background. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 merged PR #15668: URL: https://github.com/apache/kafka/pull/15668 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556404908 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: In order to make it be a pure constants class, could you please add following changes? 1. rename it to `TransactionLogConfigs`? 2. add `final` 3. add private constructor ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: Maybe we can revise the docs to avoid depending on those modules directly? ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +
[PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]
OmniaGM opened a new pull request, #15684: URL: https://github.com/apache/kafka/pull/15684 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]
chia7712 commented on code in PR #15678: URL: https://github.com/apache/kafka/pull/15678#discussion_r1556390789 ## server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java: ## @@ -172,10 +172,11 @@ public void testAssignmentAggregation() throws InterruptedException { manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2, () -> { }); -while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) { +TestUtils.waitForCondition(() -> { time.sleep(100); Review Comment: IIRC, `TestUtils.waitForConditio` includes the `sleep`, so you don't need to call sleep manually -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1556382498 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,19 +48,40 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +private final ClusterInstance clusterInstance; +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { // Constructor injections +this.clusterInstance = clusterInstance; +} + String[] getArgs(String group, String topic) { return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", clusterInstance.bootstrapServers(), "--delete-offsets", "--group", group, "--topic", topic }; } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsNonExistingGroup(String quorum) { +ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { +ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); + +return new ConsumerGroupCommand.ConsumerGroupService( +opts, +Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) +); +} + +@ClusterTests({ +@ClusterTest(clusterType = Type.ZK), Review Comment: Could we define `ClusterTestDefaults` at class-level? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
chia7712 commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1556371500 ## server/src/main/java/org/apache/kafka/server/config/KafkaSecurityConfigs.java: ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; + +import java.util.List; +import java.util.Locale; + +/** + * Common home for broker-side security configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaSecurityConfigs { + +/** * SSL Configuration / +public final static String SSL_PROTOCOL_CONFIG = SslConfigs.SSL_PROTOCOL_CONFIG; +public final static String SSL_PROTOCOL_DOC = SslConfigs.SSL_PROTOCOL_DOC; +public static final String SSL_PROTOCOL_DEFAULT = SslConfigs.DEFAULT_SSL_PROTOCOL; + +public final static String SSL_PROVIDER_CONFIG = SslConfigs.SSL_PROVIDER_CONFIG; +public final static String SSL_PROVIDER_DOC = SslConfigs.SSL_PROVIDER_DOC; + +public final static String SSL_CIPHER_SUITES_CONFIG = SslConfigs.SSL_CIPHER_SUITES_CONFIG; +public final static String SSL_CIPHER_SUITES_DOC = SslConfigs.SSL_CIPHER_SUITES_DOC; + +public final static String SSL_ENABLED_PROTOCOLS_CONFIG = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG; +public final static String SSL_ENABLED_PROTOCOLS_DOC = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC; +public static final String SSL_ENABLED_PROTOCOLS_DEFAULTS = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS; + +public final static String SSL_KEYSTORE_TYPE_CONFIG = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; +public final static String SSL_KEYSTORE_TYPE_DOC = SslConfigs.SSL_KEYSTORE_TYPE_DOC; +public static final String SSL_KEYSTORE_TYPE_DEFAULT = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE; + +public final static String SSL_KEYSTORE_LOCATION_CONFIG = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +public final static String SSL_KEYSTORE_LOCATION_DOC = SslConfigs.SSL_KEYSTORE_LOCATION_DOC; + +public final static String SSL_KEYSTORE_PASSWORD_CONFIG = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +public final static String SSL_KEYSTORE_PASSWORD_DOC = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC; + +public final static String SSL_KEY_PASSWORD_CONFIG = SslConfigs.SSL_KEY_PASSWORD_CONFIG; +public final static String SSL_KEY_PASSWORD_DOC = SslConfigs.SSL_KEY_PASSWORD_DOC; + +public final static String SSL_KEYSTORE_KEY_CONFIG = SslConfigs.SSL_KEYSTORE_KEY_CONFIG; +public final static String SSL_KEYSTORE_KEY_DOC = SslConfigs.SSL_KEYSTORE_KEY_DOC; + +public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG; +public final static String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC; +public final static String SSL_TRUSTSTORE_TYPE_CONFIG = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; +public final static String SSL_TRUSTSTORE_TYPE_DOC = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC; +public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE; + +public final static String SSL_TRUSTSTORE_LOCATION_CONFIG = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +public final static String SSL_TRUSTSTORE_PASSWORD_DOC = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC; + +public final static String SSL_TRUSTSTORE_PASSWORD_CONFIG = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; +public final static String SSL_TRUSTSTORE_LOCATION_DOC = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC; + +public final static String SSL_TRUSTSTORE_CERTIFICATES_CONFIG =
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
chia7712 commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2043536012 @nizhikov thanks for updated PR. I have a major question: Does `ConfigCommandIntegrationTest` have only zk-related tests? If so, we don't need to rewrite it by java as it will be removed directly. For another, it seems that we don't have integration test for broker configs? Do you have free cycles to take over it? thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1556357170 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: I rename it `KafkaSecurityConfigs` and left a comment that it's a central place for Kafka security configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
junrao commented on code in PR #15631: URL: https://github.com/apache/kafka/pull/15631#discussion_r1556331481 ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -952,8 +952,9 @@ class UnifiedLogTest { assertEquals(0, lastSeq) } - @Test - def testRetentionDeletesProducerStateSnapshots(): Unit = { + @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with empty-active-segment: {0}") + @ValueSource(booleans = Array(true, false)) + def testRetentionDeletesProducerStateSnapshots(isEmptyActiveSegment: Boolean): Unit = { Review Comment: isEmptyActiveSegment => createEmptyActiveSegment and change the test name accordingly. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1532,10 +1532,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } localLog.checkIfMemoryMappedBufferClosed() -// remove the segments for lookups -localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) +if (segmentsToDelete.nonEmpty) { + // increment the local-log-start-offset before removing the segment for lookups Review Comment: We probably should say "increment local-log-start-offset or log-start-offset" since `incrementStartOffset()` could update either one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KIP-848 system tests 2024-04-08 [kafka]
kirktrue opened a new pull request, #15683: URL: https://github.com/apache/kafka/pull/15683 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
chia7712 commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1556226896 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: > I think better approach would be moving default values to SslConfigs, SaslConfigs ,SecurityConfig and BrokerSecurityConfigs WDYT? agreed > KafkaConfig while it is an anti-pattern it has been acting as one place where we can find all KafkaConfig. got it. I can buy the purpose. BTW, could we rename it to KafkaConfig"s" and make it be a final class since we don't want to have instance of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]
dajac commented on PR #15680: URL: https://github.com/apache/kafka/pull/15680#issuecomment-2043344035 @apoorvmittal10 Haven’t we already released those metrics in 3.7? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16482) Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach
[ https://issues.apache.org/jira/browse/KAFKA-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835005#comment-17835005 ] Chia-Ping Tsai commented on KAFKA-16482: I'm a big confused that should we encourage developers to set configs for all test case by modifying `ClusterConfig` in `BeforeEach` phase? it seems to me that we allow to inject `ClusterConfig` in everywhere, and so it is hard to be keenly aware of "Do those changes have effect on the cluster"? Personally, I prefer to set configs for all test cases by `ClusterTestDefaults` (this function is not implemented). And individual configs of test case can be changed by `ClusterTest`. I'd want to disable to inject `ClusterConfig`. However, maybe there are some test cases that they need a lot of "if-else" to define the configs rather than just pass "constants". [~davidarthur] WDYT? > Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach > --- > > Key: KAFKA-16482 > URL: https://issues.apache.org/jira/browse/KAFKA-16482 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Cheng-Kai, Zhang >Priority: Major > > IDE does not like the code style, and we can leverage `ClusterConfigProperty` > to eliminate the false error from IDE > > [https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala#L42] > [https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java#L75] > > https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java#L68 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043274125 @junrao thanks for reviews. both comments get addressed in https://github.com/apache/kafka/pull/15621/commits/581242c1fa6c005bf91a7ced96932774c2c02cd9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
junrao commented on code in PR #14242: URL: https://github.com/apache/kafka/pull/14242#discussion_r1556161125 ## server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java: ## @@ -72,18 +72,20 @@ public CheckpointFile(File file, tempPath = Paths.get(absolutePath + ".tmp"); } -public void write(Collection entries) throws IOException { +public void write(Collection entries, boolean sync) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); checkpointWriteBuffer.write(entries); writer.flush(); -fileOutputStream.getFD().sync(); +if (sync) { +fileOutputStream.getFD().sync(); Review Comment: @ocadaruma : I realized a potential issue with this change. The issue is that if sync is false, we don't force a flush to disk. However, the OS could flush partial content of the leader epoch file. If the broker has a hard failure, the leader epoch file could be corrupted. In the recovery path, since we always expect the leader epoch file to be well-formed, a corrupted leader epoch file will fail the recovery. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556146632 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556142547 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
kirktrue commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556133511 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1750,8 @@ private void subscribeInternal(Pattern pattern, Optional subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); -if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { -data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); -sentFields.subscribedTopicNames = subscribedTopicNames; -} -} else { -// SubscribedTopicRegex - only sent if it has changed since the last heartbeat -// - not supported yet +// SubscribedTopicNames - only sent if has changed since the last heartbeat +TreeSet subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription()); +if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) { +data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +sentFields.subscribedTopicNames = subscribedTopicNames; Review Comment: Nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2043179367 @cadonna could you take a look at this one when you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1556068446 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -56,11 +60,33 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { -setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testListMaxTimestampWithEmptyLog(quorum: String): Unit = { +val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName) +assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset()) +assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testListVersion0(quorum: String): Unit = { +// create records for version 0 +createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0) +produceMessagesInSeparateBatch() + +// update version to version 1 to list offset for max timestamp +createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) +// the offset of max timestamp is always -1 if the batch version is 0 +verifyListOffsets(expectedMaxTimestampOffset = -1) + Review Comment: extra new line ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -123,7 +149,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest @ValueSource(strings = Array("zk")) def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = { -createOldMessageFormatBrokers() +createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) Review Comment: > // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. > // So in this separate batch test, it'll be the last offset 2 The comment in line 159 is not very accurate. Since we advance the time for each batch, the maxTimestampOffset is the message in the last batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1556104853 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2043121631 Hey @cadonna, could you take a look if you have some time too? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16474: -- Assignee: Lianet Magrans (was: Philip Nee) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
brandboat commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1556052135 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -300,6 +301,15 @@ public void testGetReplicaAssignments() throws Exception { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0); + +assignments.clear(); Review Comment: You're right, already removed it in the latest commit, many thanks :smiley: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1556044180 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie this.subscriptionMap = new ConcurrentHashMap<>(); this.subscriptionUpdateVersion = new AtomicInteger(0); this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE)); -this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", new SystemTimer("client-metrics")); +this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics")); Review Comment: Sure, addressed in https://github.com/apache/kafka/pull/15668/commits/2da40c26541e727f2cd38bb9fb64ee41e5ca42c2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-16478. Resolution: Fixed > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Assignee: Mickael Maison >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2042958530 Hey @Phuc-Hong-Tran, thanks for the update, left some more comments. Almost there! Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1555969469 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1751,16 +1753,7 @@ private void subscribeInternal(Pattern pattern, Optional
Re: [PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]
apoorvmittal10 commented on PR #15681: URL: https://github.com/apache/kafka/pull/15681#issuecomment-2042866833 > Tiny nit. It's KIP-1019 I think, not KIP-109. Corrected the description. Thanks @AndrewJSchofield. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]
AndrewJSchofield commented on PR #15681: URL: https://github.com/apache/kafka/pull/15681#issuecomment-2042864722 Tiny nit. It's KIP-1019 I think, not KIP-109. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1555915666 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1667,6 +1669,7 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); maybeInvokeCommitCallbacks(); +maybeUpdateSubscriptionMetadata(); backgroundEventProcessor.process(); // Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as Review Comment: Since we're here, and adding logic that adds to the purpose of this `updateAssignmentMetadataIfNeeded`, we could clean up and remove this outdated comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) [kafka]
apoorvmittal10 commented on PR #15682: URL: https://github.com/apache/kafka/pull/15682#issuecomment-2042864619 The build is dependent on merge of PR: https://github.com/apache/kafka/pull/15681 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #15682: URL: https://github.com/apache/kafka/pull/15682 The PR implements the changes defined in KIP-1019. Does the cleanup for accessing KafkaMetric field by reflection and uses method exposed by KIP-1019 for metric measurability. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]
apoorvmittal10 opened a new pull request, #15681: URL: https://github.com/apache/kafka/pull/15681 The PR implements the changes defined in [KIP-109](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability) which exposes method to check if metric is of type Measurable. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1555909363 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala: ## @@ -39,9 +39,8 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { * metadata refresh the consumer becomes subscribed to this new topic and all partitions * of that topic are assigned to it. */ - // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented. @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) Review Comment: If we don't use the `getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly` function anymore we should remove its definition from this file now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1555905789 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +520,124 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; Review Comment: @junrao thanks for the details. I have opened the PR to address same: https://github.com/apache/kafka/pull/15680 cc: @AndrewJSchofield -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16485: Broker metrics to follow kebab/hyphen case (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #15680: URL: https://github.com/apache/kafka/pull/15680 The PR updates the broker metrics name to kebab/hyphen case as pointed out by @junrao in below comment: https://github.com/apache/kafka/pull/15251#discussion_r1498439741 I have also removed the redundant `client-metrics-` prefix in all metrics as the group name in `client-metrics` itself. Once merged I ll update the KIP accordingly. - jconsole details: https://github.com/apache/kafka/assets/2861565/f70d72b7-22ad-49ee-95e8-3898623817f1;> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 opened a new pull request, #15679: URL: https://github.com/apache/kafka/pull/15679 By using ClusterTestExtensions, `DeleteOffsetsConsumerGroupCommandIntegrationTest` get away from `KafkaServerTestHarness` dependency. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1555878104 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: We can rename it to `KafkaSecurityConfig` as a common ground between the two approaches. Having central place for all security configs and break out of KafkaConfig anti-pattern. I don't mind either way. It just might be confusing as we have already `SecurityConfig` and `BrokerSecurityConfigs` we just might need to be mindful about naming the new one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1555878104 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: We can rename it to `KafkaSecurityConfig` as a common ground between the two approaches. Having central place for all security configs and break out of KafkaConfig anti-pattern. I don't mind either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1555878104 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: We can rename it to `KafkaSecurityConfig` as a common ground between the two approaches. Having central place for all security configs and break out of KafkaConfig anti-pattern -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1555869666 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: We can, but we already have `SslConfigs`, `SaslConfigs`, `SecurityConfig` and `BrokerSecurityConfigs`. KafkaConfig while it is an anti-pattern it has been acting as one place where we can find all KafkaConfig. I think better approach would be moving default values to `SslConfigs`, `SaslConfigs` ,`SecurityConfig` and `BrokerSecurityConfigs` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1555869666 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: We can, but we already have `SslConfigs`, `SecurityConfig` and `BrokerSecurityConfigs`. KafkaConfig while it is an anti-pattern it has been acting as one place where we can find all KafkaConfig. I think better approach would be moving default values to `SslConfigs`, `SecurityConfig` and `BrokerSecurityConfigs` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move Sasl and SSL configs out of core [kafka]
OmniaGM commented on code in PR #15656: URL: https://github.com/apache/kafka/pull/15656#discussion_r1555869666 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; + +public class KafkaConfig { Review Comment: We can, but we already have `SslConfigs`, `SecurityConfig` and `BrokerSecurityConfigs`. KafkaConfig while it is an anti-pattern it has been acting as one place where we can find all KafkaConfig. I think better approach would be moving docs and default values to `SslConfigs`, `SecurityConfig` and `BrokerSecurityConfigs` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16486) Integrate metric measurability changes in metrics collector
Apoorv Mittal created KAFKA-16486: - Summary: Integrate metric measurability changes in metrics collector Key: KAFKA-16486 URL: https://issues.apache.org/jira/browse/KAFKA-16486 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16485) Fix broker metrics to follow kebab/hyphen case
[ https://issues.apache.org/jira/browse/KAFKA-16485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-16485: -- Parent: KAFKA-15601 Issue Type: Sub-task (was: Improvement) > Fix broker metrics to follow kebab/hyphen case > -- > > Key: KAFKA-16485 > URL: https://issues.apache.org/jira/browse/KAFKA-16485 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16485) Fix broker metrics to follow kebab/hyphen case
Apoorv Mittal created KAFKA-16485: - Summary: Fix broker metrics to follow kebab/hyphen case Key: KAFKA-16485 URL: https://issues.apache.org/jira/browse/KAFKA-16485 Project: Kafka Issue Type: Improvement Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]
chiacyu opened a new pull request, #15678: URL: https://github.com/apache/kafka/pull/15678 We should replace the while loop in some test cases with waitForCondition to prevent infinite looping conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16481: Fixing flaky test kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics [kafka]
showuon merged PR #15677: URL: https://github.com/apache/kafka/pull/15677 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16481: Fixing flaky test kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics [kafka]
showuon commented on PR #15677: URL: https://github.com/apache/kafka/pull/15677#issuecomment-2042592569 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on PR #15305: URL: https://github.com/apache/kafka/pull/15305#issuecomment-2042586830 Will check it this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2042555906 > Look like the build still contains failed test :( yep, I have filed another #15654 to dig in that :_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2042547986 > rebase to trigger QA again Look like the build still contains failed test :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
cadonna commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1555453187 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -293,17 +288,12 @@ public void cleanup() throws IOException { task = null; } Utils.delete(BASE_DIR); -mockito.finishMocking(); } @Test -public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); -stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); -EasyMock.expectLastCall(); -EasyMock.replay(stateDirectory, stateManager); +public void shouldThrowLockExceptionIfFailedToLockStateDirectory() { +stateDirectory = mock(StateDirectory.class); Review Comment: I am wondering whether we leak resources, if we assign a mock to `stateDirectory` without closing the state directory before. In `setup()` an actual state directory is created with ``` stateDirectory = new StateDirectory(createConfig("100"), new MockTime(), true, false); ``` It has been there before this PR, but we should fix it. You can either close the state directory before the mock is assigned or remove the creation of the state directory from `setup()` and create it in each test method that uses it. Same is true for the other occurrences in this test class. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1912,42 +1823,32 @@ public void shouldThrowIfPostCommittingOnIllegalState() { @Test public void shouldSkipCheckpointingSuspendedCreatedTask() { -stateManager.checkpoint(); -EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint")); - EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); -EasyMock.replay(stateManager, recordCollector); - task = createStatefulTask(createConfig("100"), true); task.suspend(); task.postCommit(true); + +verify(stateManager, never()).checkpoint(); } @Test public void shouldCheckpointForSuspendedTask() { -stateManager.checkpoint(); -EasyMock.expectLastCall().once(); -EasyMock.expect(stateManager.changelogOffsets()) -.andReturn(singletonMap(partition1, 1L)); - EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); -EasyMock.replay(stateManager, recordCollector); +when(stateManager.changelogOffsets()) +.thenReturn(singletonMap(partition1, 1L)); Review Comment: nit: ```suggestion when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 1L)); ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -411,25 +384,22 @@ public void seek(final TopicPartition partition, final long offset) { shouldNotSeek.set(new AssertionError("Should not seek")); +@SuppressWarnings("unchecked") final java.util.function.Consumer> resetter = -EasyMock.mock(java.util.function.Consumer.class); -resetter.accept(Collections.singleton(partition1)); -EasyMock.expectLastCall(); -EasyMock.replay(resetter); +mock(java.util.function.Consumer.class); +doNothing().when(resetter).accept(Collections.singleton(partition1)); Review Comment: This should be a verification. However, there is an issue here. If I add it to the verifications with ```java verify(resetter).accept(Collections.singleton(partition1)); ``` the test fails. The reason is that when ` accept()` is called, the argument is indeed `Collections.singleton(partition1)` but after the call the collection is cleared: https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L955 At the time the call is verified the argument changed. Apparently, Mockito stores the reference to the argument in the invocation. One way to solve this is the following: ```java final java.util.function.Consumer> resetter = mock(java.util.function.Consumer.class); final Set partitionsAtCall = new HashSet<>(); doAnswer( invocation -> { partitionsAtCall.addAll(invocation.getArgument(0)); return null; } ).when(resetter).accept(Collections.singleton(partition1)); task.initializeIfNeeded(); task.completeRestoration(resetter); // because we mocked the
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1555661766 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: Good call, pushed a refactor for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru merged PR #15525: URL: https://github.com/apache/kafka/pull/15525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2042214344 Yup, I need to change the kafka-get-offsets tool to easily access said functionality, but I am in the process of raising that PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2042116296 Hello @chia7712 > we can complete it in another PR before this PR. `junit-platform.properties` added for core and tools modules. Can you, please, review this test refactoring? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834821#comment-17834821 ] Philipp Trulson commented on KAFKA-16478: - Thanks for the PR! Unfortunately it was too late in my TZ already, LGTM :) > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Assignee: Mickael Maison >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand [kafka]
showuon commented on code in PR #15659: URL: https://github.com/apache/kafka/pull/15659#discussion_r1555327569 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -300,6 +301,15 @@ public void testGetReplicaAssignments() throws Exception { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0); + +assignments.clear(); Review Comment: Sorry, what's the purpose we explicitly clear `assignments` here? I think this is only used as local scope variable, we should not clear it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1555305760 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + if (this.promotionStates.containsKey(topicPartition)) { +val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition) +// Revert any reassignments for partitions that did not complete the future replica promotion +if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ()) +} +this.promotionStates.remove(topicPartition) + } Review Comment: In your opinion, if what circumstance, the `promotionStates` will not contain the topic partition? Maybe the `removePartitions` got called multiple times? I'm thinking, because before this change, we'll send out `AssignReplicasToDirsRequest` no matter what, but now, we will skip when no `PromotionState`, will that cause any potential problem? Maybe when upgrading? (Is it possible?) Should we still invoke `directoryEventHandler.handleAssignment` even if no `PromotionState`? I think no, but I'd like to hear your thought here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1555264597 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + +private static final String NAMESPACE = "namespace"; +// Connect format - any types should be accepted here +private static final Map OFFSET_KEY = Collections.singletonMap("key", "key"); +private static final Map OFFSET_VALUE = Collections.singletonMap("key", 12); + +// Serialized +private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); +private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + +private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + +private final Converter keyConverter = mock(Converter.class); +private final Converter valueConverter = mock(Converter.class); Review Comment: We tend to use the `@Mock` annotation instead of final fields: ```suggestion @Mock private Converter keyConverter; @Mock private Converter valueConverter; ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import