[GitHub] [kafka] jlprat commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6
jlprat commented on code in PR #11478: URL: https://github.com/apache/kafka/pull/11478#discussion_r1170881445 ## docs/upgrade.html: ## @@ -26,6 +26,8 @@ Notable changes in 3 trying to create an already existing metric. (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-843%3A+Adding+addMetricIfAbsent+method+to+Metrics";>KIP-843 for more details). +Apache Kafka now supports having both an IPv4 and an IPv6 listener on the same port. This change only applies to +non advertised listeners (advertised listeners already have this feature) Review Comment: Can you move this section to a new `3.6` area? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6
jlprat commented on PR #11478: URL: https://github.com/apache/kafka/pull/11478#issuecomment-1514217231 Hi @mimaison, the current unit tests present in the PR seem that they will cover the case of a potential involuntary regression for this feature. So I'm fine adding the change as it currently stands. @mdedetrich Could you update the documentation bit? Currently, it is under `3.3`, but it should be moved under a new 3.6 section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy
showuon commented on code in PR #12545: URL: https://github.com/apache/kafka/pull/12545#discussion_r1170874370 ## clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java: ## @@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) { } return Double.longBitsToDouble(value); } + +@Override +public Double deserialize(String topic, Headers headers, ByteBuffer data) { +if (data == null) { +return null; +} + +if (data.remaining() != 8) { +throw new SerializationException("Size of data received by DoubleDeserializer is not 8"); +} + +final ByteOrder srcOrder = data.order(); +data.order(BIG_ENDIAN); + +final double value = data.getDouble(data.position()); Review Comment: Also, the `data.position()` could be removed because by default, it'll read from current position of the byte buffer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy
showuon commented on code in PR #12545: URL: https://github.com/apache/kafka/pull/12545#discussion_r1170872007 ## clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java: ## @@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) { } return Double.longBitsToDouble(value); } + +@Override +public Double deserialize(String topic, Headers headers, ByteBuffer data) { +if (data == null) { +return null; +} + +if (data.remaining() != 8) { +throw new SerializationException("Size of data received by DoubleDeserializer is not 8"); +} + +final ByteOrder srcOrder = data.order(); +data.order(BIG_ENDIAN); + +final double value = data.getDouble(data.position()); Review Comment: @LinShunKang , why do we need to set the order to `BIG_ENDIAN` before reading it? From the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--): > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight. It looks like the byte order is already considered while read from the byte buffer. Had a quick check the jdk source code, and it did check (and convert) the byte order if needed. Did I miss anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12545: KIP-863: Reduce CompletedFetch#parseRecord() memory copy
showuon commented on code in PR #12545: URL: https://github.com/apache/kafka/pull/12545#discussion_r1170872007 ## clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java: ## @@ -35,4 +41,22 @@ public Double deserialize(String topic, byte[] data) { } return Double.longBitsToDouble(value); } + +@Override +public Double deserialize(String topic, Headers headers, ByteBuffer data) { +if (data == null) { +return null; +} + +if (data.remaining() != 8) { +throw new SerializationException("Size of data received by DoubleDeserializer is not 8"); +} + +final ByteOrder srcOrder = data.order(); +data.order(BIG_ENDIAN); + +final double value = data.getDouble(data.position()); Review Comment: @LinShunKang , why do we need to set the order to `BIG_ENDIAN` before reading it? From the [javadoc](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#getDouble--): > Reads the next eight bytes at this buffer's current position, composing them into a double value according to the current byte order, and then increments the position by eight. It looks like the byte order is already considered while read from the byte buffer. Had a quick check the jdk source code, and it did check (and convert) the byte order if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170869917 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java: ## @@ -97,6 +95,27 @@ public KTableKTableJoinMerger joinMerger() { return (KTableKTableJoinMerger) kChangeProcessorSupplier; } +@Override +public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) { +enableVersionedSemantics(thisProcessorParameters(), useVersionedSemantics, parentNodeName); +enableVersionedSemantics(otherProcessorParameters(), useVersionedSemantics, parentNodeName); +} + +@SuppressWarnings("unchecked") +private void enableVersionedSemantics(final ProcessorParameters processorParameters, + final boolean useVersionedSemantics, + final String parentNodeName) { +final ProcessorSupplier processorSupplier = processorParameters.processorSupplier(); +if (!(processorSupplier instanceof KTableKTableAbstractJoin)) { +throw new IllegalStateException("Unexpected processor type for table-table join: " + processorSupplier.getClass().getName()); +} +final KTableKTableAbstractJoin tableJoin = (KTableKTableAbstractJoin) processorSupplier; + +if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) { Review Comment: Good question -- the answer is that this join node has multiple parents, while all the other `VersionedSemanticsGraphNode` implementations only have a single parent. Because this join node has multiple parents and writes two separate processors (one for each side of the join) to the topology, when enabling versioned semantics we need a way to distinguish which side of the join we're enabling versioned semantics for. In other words, it's possible that only one side of the join is "versioned," in which case one of the two join processors should have versioned semantics enabled while the other should not. The way that we determine which side of the join to enable versioned semantics for is based on the parent node name; the processor whose "joinThis" is the parent node which has been identified as versioned is the processor for which versioned semantics will be enabled. (In the case of a self-join, both processors will satisfy this check, and both processors will have versioned seman tics enabled.) For all other `VersionedSemanticsGraphNode` implementations which only have a single parent, we could also perform an analogous parent node name check if we wanted to, but the parent node name should always match so it'd be redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools
[ https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713911#comment-17713911 ] Sagar Rao commented on KAFKA-14586: --- [~mjsax] , I can add the redirection for this. I have read through KIP-906 so I can take [~fvaleri] 's help. Looks like the code freeze is next week, so i will try to accelerate on this. > Move StreamsResetter to tools > - > > Key: KAFKA-14586 > URL: https://issues.apache.org/jira/browse/KAFKA-14586 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Sagar Rao >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1170269949 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: I am not sure line num:1082 is sane as you meant it to be as the file could have been updated. Please clarify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver
dengziming commented on code in PR #13432: URL: https://github.com/apache/kafka/pull/13432#discussion_r1170758199 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.ListOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public final class ListOffsetsHandler extends Batched { + +private final Map offsetTimestampsByPartition; +private final ListOffsetsOptions options; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public ListOffsetsHandler( +Map offsetTimestampsByPartition, +ListOffsetsOptions options, +LogContext logContext +) { +this.offsetTimestampsByPartition = offsetTimestampsByPartition; +this.options = options; +this.log = logContext.logger(ListOffsetsHandler.class); +this.lookupStrategy = new PartitionLeaderStrategy(logContext); +} + +@Override +public String apiName() { +return "listOffsets"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return this.lookupStrategy; +} + +@Override +ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { +Map topicsByName = CollectionUtils.groupPartitionsByTopic( +keys, +topicName -> new ListOffsetsTopic().setName(topicName), +(listOffsetsTopic, partitionId) -> { +TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId); +long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); +listOffsetsTopic.partitions().add( +new ListOffsetsPartition() +.setPartitionIndex(partitionId) +.setTimestamp(offsetTimestamp)); +}); +boolean supportsMaxTimestamp = keys +.stream() +.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP); + +return ListOffsetsRequest.Builder +.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp) +.setTargetTimes(new ArrayList<>(topicsByName.values())); +} + +@Override +public ApiResult handleResponse( +Node broker, +Set keys, +AbstractResponse abstractResponse +) { +ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; +Map completed = new HashMap<>(); +Map failed = new HashMap<>(); +List unmapp
[jira] [Resolved] (KAFKA-14908) Sporadic "Address already in use" when starting kafka cluster embedded within tests
[ https://issues.apache.org/jira/browse/KAFKA-14908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14908. --- Fix Version/s: 3.6.0 Resolution: Fixed > Sporadic "Address already in use" when starting kafka cluster embedded within > tests > --- > > Key: KAFKA-14908 > URL: https://issues.apache.org/jira/browse/KAFKA-14908 > Project: Kafka > Issue Type: Bug >Reporter: Keith Wall >Priority: Minor > Fix For: 3.6.0 > > > We have an integration test suite that starts/stops a kafka cluster > before/after each test. Kafka is being started programmatically within the > same JVM that is running the tests. > Sometimes we get sporadic failures from with Kafka as it tries to bind the > server socket. > {code:java} > org.apache.kafka.common.KafkaException: Socket server failed to bind to > 0.0.0.0:9092: Address already in use. > at kafka.network.Acceptor.openServerSocket(SocketServer.scala:684) > at kafka.network.Acceptor.(SocketServer.scala:576) > at kafka.network.DataPlaneAcceptor.(SocketServer.scala:433) > at > kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:247) > at > kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:226) > at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:173) > at > kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:173) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) > at scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at kafka.network.SocketServer.(SocketServer.scala:173) > at kafka.server.KafkaServer.startup(KafkaServer.scala:331) {code} > Investigation has shown that the socket is in the timed_wait state from a > previous test. > I know Kafka supports ephemeral ports, but this isn't convenient to our > use-case. > I'd like to suggest that Kafka is changed to set the SO_REUSEADDR on the > server socket. I believe this is standard practice for server applications > that run on well known ports . > I don't believe this change would introduce a backward compatibility > concerns. > > I will open a PR so that can be considered. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
showuon merged PR #13572: URL: https://github.com/apache/kafka/pull/13572 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14586) Move StreamsResetter to tools
[ https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713836#comment-17713836 ] Matthias J. Sax commented on KAFKA-14586: - Thanks for providing context, and no worries about not knowing about the other KIP (there is too many things going on, and I also just realized the overlap). Yes, `StreamsResetter` might be used programmatically, so we should add a redirection. Who will do this? Guess we should get it in before code freeze to not delay the release. I am not worried about moving the test because it's not user facing. Overall, it seem we can close out the other KIP and ticket as "subsumed" by this ticket/KIP. I can do the cleanup for it. Just let me know if there is anything I can help with, or if the matter is resolved after we got the missing redirection merged. > Move StreamsResetter to tools > - > > Key: KAFKA-14586 > URL: https://issues.apache.org/jira/browse/KAFKA-14586 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Sagar Rao >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down
hudeqi commented on PR #13473: URL: https://github.com/apache/kafka/pull/13473#issuecomment-1514011903 Hello, can you help to review this PR? @mimaison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
mjsax commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170677125 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java: ## @@ -97,6 +95,27 @@ public KTableKTableJoinMerger joinMerger() { return (KTableKTableJoinMerger) kChangeProcessorSupplier; } +@Override +public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) { +enableVersionedSemantics(thisProcessorParameters(), useVersionedSemantics, parentNodeName); +enableVersionedSemantics(otherProcessorParameters(), useVersionedSemantics, parentNodeName); +} + +@SuppressWarnings("unchecked") +private void enableVersionedSemantics(final ProcessorParameters processorParameters, + final boolean useVersionedSemantics, + final String parentNodeName) { +final ProcessorSupplier processorSupplier = processorParameters.processorSupplier(); +if (!(processorSupplier instanceof KTableKTableAbstractJoin)) { +throw new IllegalStateException("Unexpected processor type for table-table join: " + processorSupplier.getClass().getName()); +} +final KTableKTableAbstractJoin tableJoin = (KTableKTableAbstractJoin) processorSupplier; + +if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) { Review Comment: Not sure if I understand this condition? Can you elaborate? It seems to be the only place when we sue the newly added `parentNodeName` -- why do we not use it elsewhere? ## streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java: ## @@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() { null, null, null, -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), +null, +null, +null, +null, +null, Review Comment: It think there is one `null` line too many? ## streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java: ## @@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws Exception { null, null, null, -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), null, null, -Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) +null, +null, +Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), +Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), +null, +null, +Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)) Review Comment: Should the last two result row flip: we first get `F-e` when we process left hand `F` and get nothing when we process right hand `f`? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -804,6 +806,7 @@ private KTable doJoin(final KTable other, kTableKTableJoinNode.setOutputVersioned(isOutputVersioned); builder.addGraphNode(this.graphNode, kTableKTableJoinNode); +builder.addGraphNode(((KTableImpl) other).graphNode, kTableKTableJoinNode); Review Comment: Yeah. Seems to be incorrect, but did apparently not surface as a bug. Nice fix! ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1098,7 +1101,7 @@ private KTable doJoinOnForeignKey(final KTable forei //not be done needlessly. ((KTableImpl) foreignKeyTable).enableSendingOldValues(true); -//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node. +//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node. Review Comment: Thank you! -- Could we do a small follow up for 3.4 branch to get it backported? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
hachikuji commented on code in PR #13579: URL: https://github.com/apache/kafka/pull/13579#discussion_r1170696945 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions +txnMetadata.inLock { + if (txnMetadata.producerId != producerId) { +Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { +Left(Errors.PRODUCER_FENCED) + } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) { +Left(Errors.CONCURRENT_TRANSACTIONS) + } else { +Right(partitions.map(part => Review Comment: nit: usually we would write `partitions.map { part =>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
hachikuji commented on code in PR #13579: URL: https://github.com/apache/kafka/pull/13579#discussion_r1170696945 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions +txnMetadata.inLock { + if (txnMetadata.producerId != producerId) { +Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { +Left(Errors.PRODUCER_FENCED) + } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) { +Left(Errors.CONCURRENT_TRANSACTIONS) + } else { +Right(partitions.map(part => Review Comment: nit: usually we would write `partitions.map { part => ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,42 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions Review Comment: nit: this comment seems misplaced Could we have a short comment here that we intentionally do not check pending state? We can mention that partitions are removed from the transaction metadata as soon as the markers are confirmed written. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
lanshiqin commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1170685549 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,28 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * If the maximum offset beyond index, appended to the log section, it throws LogSegmentOffsetOverflowException + */ + @ParameterizedTest + @CsvSource(Array( Review Comment: Thanks, I've added this use case data -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
jolshan commented on code in PR #13579: URL: https://github.com/apache/kafka/pull/13579#discussion_r1170681401 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions +txnMetadata.inLock { + if (txnMetadata.producerId != producerId) { +Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { +Left(Errors.PRODUCER_FENCED) + } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) { +// return a retriable exception to let the client backoff and retry +Left(Errors.CONCURRENT_TRANSACTIONS) Review Comment: If we allow any pending states, we will have to catch the error on kafka-14884. Which I suppose is also ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
jolshan commented on code in PR #13579: URL: https://github.com/apache/kafka/pull/13579#discussion_r1170681193 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions +txnMetadata.inLock { + if (txnMetadata.producerId != producerId) { +Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { +Left(Errors.PRODUCER_FENCED) + } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) { +// return a retriable exception to let the client backoff and retry +Left(Errors.CONCURRENT_TRANSACTIONS) Review Comment: If we are pending commit or abort I don't know if it makes sense to verify and allow the write to continue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
hachikuji commented on code in PR #13579: URL: https://github.com/apache/kafka/pull/13579#discussion_r1170680513 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions +txnMetadata.inLock { + if (txnMetadata.producerId != producerId) { +Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { +Left(Errors.PRODUCER_FENCED) + } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) { +// return a retriable exception to let the client backoff and retry +Left(Errors.CONCURRENT_TRANSACTIONS) Review Comment: Yeah, I think that's right. It works because we remove the partition after we have confirmed that the end transaction marker has been written. So if the partition is included, then it means markers are still to come. This assumes we fix https://issues.apache.org/jira/browse/KAFKA-14884 of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13579: KAFKA-14904: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
artemlivshits commented on code in PR #13579: URL: https://github.com/apache/kafka/pull/13579#discussion_r1170670464 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -332,24 +331,45 @@ class TransactionCoordinator(txnConfig: TransactionConfig, debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request for verification") responseCallback(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitions.map(_ -> Errors.INVALID_REQUEST).toMap.asJava)) } else { - val result: ApiResult[(Int, TransactionMetadata)] = getTransactionMetadata(transactionalId, producerId, producerEpoch, partitions) - + val result: ApiResult[Map[TopicPartition, Errors]] = +txnManager.getTransactionState(transactionalId).flatMap { + case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING) + + case Some(epochAndMetadata) => +val txnMetadata = epochAndMetadata.transactionMetadata + +// generate the new transaction metadata with added partitions +txnMetadata.inLock { + if (txnMetadata.producerId != producerId) { +Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.producerEpoch != producerEpoch) { +Left(Errors.PRODUCER_FENCED) + } else if (txnMetadata.pendingTransitionInProgress && !(txnMetadata.pendingState == Some(Ongoing) && txnMetadata.state == Ongoing)) { +// return a retriable exception to let the client backoff and retry +Left(Errors.CONCURRENT_TRANSACTIONS) Review Comment: Do we need this condition for verification case? No matter what the pending state is, if the the state contains the partition, we're good, otherwise, we'd fail during verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14920) Address timeouts and out of order sequences
Justine Olshan created KAFKA-14920: -- Summary: Address timeouts and out of order sequences Key: KAFKA-14920 URL: https://issues.apache.org/jira/browse/KAFKA-14920 Project: Kafka Issue Type: Sub-task Reporter: Justine Olshan Assignee: Justine Olshan KAFKA-14844 showed the destructive nature of a timeout on the first produce request for a topic partition (ie one that has no state in psm) Since we currently don't validate the first sequence (we will in part 2 of kip-890), any transient error on the first produce can lead to out of order sequences that never recover. Originally, KAFKA-14561 relied on the producer's retry mechanism for these transient issues, but until that is fixed, we may need to retry from in the AddPartitionsManager instead. We addressed the concurrent transactions, but there are other errors like coordinator loading that we could run into and see increased out of order issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170662644 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170662644 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()
[jira] [Resolved] (KAFKA-14917) Producer write while transaction is pending.
[ https://issues.apache.org/jira/browse/KAFKA-14917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-14917. Resolution: Won't Fix > Producer write while transaction is pending. > > > Key: KAFKA-14917 > URL: https://issues.apache.org/jira/browse/KAFKA-14917 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > As discovered in KAFKA-14904, we seem to get into a state where we try to > write to a partition while the ongoing state is still pending. > This is likely a bigger issue than the test and worth looking in to. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14917) Producer write while transaction is pending.
[ https://issues.apache.org/jira/browse/KAFKA-14917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713794#comment-17713794 ] Justine Olshan commented on KAFKA-14917: Turns out we set state to pending when we are adding ANY new partition. So for now we will allow checks if current state is ongoing and pending state is ongoing, since any partition in txnMetadata have been added and persisted. We may need to consider retries for other cases. > Producer write while transaction is pending. > > > Key: KAFKA-14917 > URL: https://issues.apache.org/jira/browse/KAFKA-14917 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > As discovered in KAFKA-14904, we seem to get into a state where we try to > write to a partition while the ongoing state is still pending. > This is likely a bigger issue than the test and worth looking in to. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170659867 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are as follows: + * + * Each member must get at least one partition for every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + *This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + *Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much as their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + * + * + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * Add members from the potentiallyUnfilled list to the Unfilled list if they haven't met the total required quota i.e. minimum number of partitions per member + 1 (if member is designated to receive one of the excess partitions) + * Generate a list of unassigned partitions by calculating the difference between total partitions and already assigned (sticky) partitions + * Iterate through unfilled members and assign partitions from the unassigned partitions + * + * + * + */ +public class RangeAssignor implements PartitionAssignor { + +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +static class RemainingAssignmentsForMember { +private final String memberId; +private final Integer remaining; + +public RemainingAssignmentsForMember(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +public String memberId() { +return memberId; +} + +public Integer remaining() { +return remaining; +} + +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); +} else { +log.info(memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +private Map>
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170658595 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are as follows: + * + * Each member must get at least one partition for every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + *This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + *Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much as their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + * + * + * The algorithm includes the following steps: Review Comment: cool -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170658513 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. Review Comment: remove the whole thing or just the html tags -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13497: KAFKA-14834: [2/N] Test coverage for out-of-order data in joins
vcrfxia commented on code in PR #13497: URL: https://github.com/apache/kafka/pull/13497#discussion_r1170638852 ## streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java: ## @@ -88,31 +88,35 @@ public static Collection data() { StreamsBuilder builder; private final List> input = Arrays.asList( -new Input<>(INPUT_TOPIC_LEFT, null), -new Input<>(INPUT_TOPIC_RIGHT, null), -new Input<>(INPUT_TOPIC_LEFT, "A"), -new Input<>(INPUT_TOPIC_RIGHT, "a"), -new Input<>(INPUT_TOPIC_LEFT, "B"), -new Input<>(INPUT_TOPIC_RIGHT, "b"), -new Input<>(INPUT_TOPIC_LEFT, null), -new Input<>(INPUT_TOPIC_RIGHT, null), -new Input<>(INPUT_TOPIC_LEFT, "C"), -new Input<>(INPUT_TOPIC_RIGHT, "c"), -new Input<>(INPUT_TOPIC_RIGHT, null), -new Input<>(INPUT_TOPIC_LEFT, null), -new Input<>(INPUT_TOPIC_RIGHT, null), -new Input<>(INPUT_TOPIC_RIGHT, "d"), -new Input<>(INPUT_TOPIC_LEFT, "D") +new Input<>(INPUT_TOPIC_LEFT, null, 1), +new Input<>(INPUT_TOPIC_RIGHT, null, 2), +new Input<>(INPUT_TOPIC_LEFT, "A", 3), +new Input<>(INPUT_TOPIC_RIGHT, "a", 4), +new Input<>(INPUT_TOPIC_LEFT, "B", 5), +new Input<>(INPUT_TOPIC_RIGHT, "b", 6), +new Input<>(INPUT_TOPIC_LEFT, null, 7), +new Input<>(INPUT_TOPIC_RIGHT, null, 8), +new Input<>(INPUT_TOPIC_LEFT, "C", 9), +new Input<>(INPUT_TOPIC_RIGHT, "c", 10), +new Input<>(INPUT_TOPIC_RIGHT, null, 11), +new Input<>(INPUT_TOPIC_LEFT, null, 12), +new Input<>(INPUT_TOPIC_RIGHT, null, 13), +new Input<>(INPUT_TOPIC_RIGHT, "d", 14), +new Input<>(INPUT_TOPIC_LEFT, "D", 15), +new Input<>(INPUT_TOPIC_LEFT, "E", 4), // out-of-order data Review Comment: Fixed in https://github.com/apache/kafka/pull/13609. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170638371 ## streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java: ## @@ -87,7 +87,37 @@ public static Collection data() { StreamsBuilder builder; -private final List> input = Arrays.asList( +protected final List> input = Arrays.asList( +new Input<>(INPUT_TOPIC_LEFT, null, 1), +new Input<>(INPUT_TOPIC_RIGHT, null, 2), +new Input<>(INPUT_TOPIC_LEFT, "A", 3), +new Input<>(INPUT_TOPIC_RIGHT, "a", 4), +new Input<>(INPUT_TOPIC_LEFT, "B", 5), +new Input<>(INPUT_TOPIC_RIGHT, "b", 6), +new Input<>(INPUT_TOPIC_LEFT, null, 7), +new Input<>(INPUT_TOPIC_RIGHT, null, 8), +new Input<>(INPUT_TOPIC_LEFT, "C", 9), +new Input<>(INPUT_TOPIC_RIGHT, "c", 10), +new Input<>(INPUT_TOPIC_RIGHT, null, 11), +new Input<>(INPUT_TOPIC_LEFT, null, 12), +new Input<>(INPUT_TOPIC_RIGHT, null, 13), +new Input<>(INPUT_TOPIC_RIGHT, "d", 7), // out-of-order data with null as latest +new Input<>(INPUT_TOPIC_LEFT, "D", 6), +new Input<>(INPUT_TOPIC_LEFT, null, 2), +new Input<>(INPUT_TOPIC_RIGHT, null, 3), +new Input<>(INPUT_TOPIC_RIGHT, "e", 14), +new Input<>(INPUT_TOPIC_LEFT, "E", 15), +new Input<>(INPUT_TOPIC_LEFT, null, 10), // out-of-order data with non-null as latest +new Input<>(INPUT_TOPIC_RIGHT, null, 9), +new Input<>(INPUT_TOPIC_LEFT, "F", 4), +new Input<>(INPUT_TOPIC_RIGHT, "f", 3) +); + +// used for stream-stream join tests where out-of-order data does not meaningfully affect +// the result, and the main `input` list results in too many result records/test noise. +// also used for table-table multi-join tests, since out-of-order data with table-table +// joins is already tested in non-multi-join settings. +protected final List> inputWithoutOutOfOrderData = Arrays.asList( Review Comment: As suggested in https://github.com/apache/kafka/pull/13497#discussion_r1163397015, I have split the input test data into two copies: a smaller one without out-of-order data (for use in stream-stream join and table-table multi-join tests) and a larger one with out-of-order data (for use elsewhere, including to validate versioned joins). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia commented on code in PR #13609: URL: https://github.com/apache/kafka/pull/13609#discussion_r1170633123 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -804,6 +806,7 @@ private KTable doJoin(final KTable other, kTableKTableJoinNode.setOutputVersioned(isOutputVersioned); builder.addGraphNode(this.graphNode, kTableKTableJoinNode); +builder.addGraphNode(((KTableImpl) other).graphNode, kTableKTableJoinNode); Review Comment: I don't know why it's currently the case that primary-key table-table join nodes only have one parent, instead of two. Seems more correct to have two, and the GraphNode mechanism for determining whether the joining table is versioned or not will not work without this parent connection. I have verified that there is no change to the built topology, so AFAICT this addition is internal-only. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1098,7 +1101,7 @@ private KTable doJoinOnForeignKey(final KTable forei //not be done needlessly. ((KTableImpl) foreignKeyTable).enableSendingOldValues(true); -//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node. +//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node. Review Comment: This, and a few other similar renames in comments, are unrelated to this PR but included as cleanup from https://github.com/apache/kafka/pull/13589. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`
vcrfxia opened a new pull request, #13609: URL: https://github.com/apache/kafka/pull/13609 There's a bug in the table-table join handling of out-of-order records in versioned tables (see https://github.com/apache/kafka/pull/13510 and https://github.com/apache/kafka/pull/13522 for context) where if the latest value for a particular key is a tombstone, then out-of-order records are not properly identified because versioned stores do not return timestamps for tombstones (so there is no timestamp to compare against, when deciding whether a record is out-of-order or not). This results in out-of-order records improperly being identified as not out-of-order, when the latest value for the key is a tombstone. This PR fixes the bug by using the `isLatest` value from the `Change` object (see https://github.com/apache/kafka/pull/13564) instead of calling `get(key)` on the state store to fetch timestamps to compare against. As part of this fix, this PR also updates table-table joins to use determine whether upstream tables are versioned by using the GraphNode mechanism, instead of checking the table's value getter. This also enables us to remove the additional state store access granted to join processors in https://github.com/apache/kafka/pull/13510 and https://github.com/apache/kafka/pull/13522, resulting in a cleaner topology. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14881) Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
[ https://issues.apache.org/jira/browse/KAFKA-14881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano resolved KAFKA-14881. --- Resolution: Fixed Merged to Trunk and 3.5. > Update UserScramCredentialRecord for SCRAM ZK to KRaft migration > > > Key: KAFKA-14881 > URL: https://issues.apache.org/jira/browse/KAFKA-14881 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.5.0 >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Major > Fix For: 3.5.0 > > > I want to support ZK to KRaft migration. > ZK stores a storedKey and a serverKey for each SCRAM credential not the > saltedPassword. > The storedKey and serverKey are a crypto hash of some data with the > saltedPassword and it is not possible to extract the saltedPassword from them. > The serverKey and storedKey are enough for SCRAM authentication and > saltedPassword is not needed. > I will update the UserScramCredentialRecord to store serverKey and storedKey > instead of saltedPassword and I will update that SCRAM is only supported with > a bumped version of IBP_3_5 so that there are no compatibility issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14919) MM2 ForwardingAdmin tests should not conflate admin operations
Greg Harris created KAFKA-14919: --- Summary: MM2 ForwardingAdmin tests should not conflate admin operations Key: KAFKA-14919 URL: https://issues.apache.org/jira/browse/KAFKA-14919 Project: Kafka Issue Type: Test Components: mirrormaker Reporter: Greg Harris The MirrorConnectorsWithCustomForwardingAdminIntegrationTest uses a special implementation of ForwardingAdmin which records admin operations in a static ConcurrentMap, which is then used to perform assertions. This has the problem that one variable (allTopics) is used to perform assertions for multiple different methods (adding topics, adding partitions, and syncing configs), despite these operations each being tested separately. This leads to the confusing behavior where each test appears to assert that a particular operation has taken place, and instead asserts that at least one of the operations has taken place. This allows a regression or timeout in one operation to be hidden by the others, making the behavior of the tests much less predictable. These tests and/or the metadata store should be changed so that the tests are isolated from one another, and actually perform the assertions that correspond to their titles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13575: KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin timeouts
gharris1727 commented on PR #13575: URL: https://github.com/apache/kafka/pull/13575#issuecomment-1513850889 Follow-up ticket: https://issues.apache.org/jira/browse/KAFKA-14919 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14905) Failing tests in MM2 ForwardingAdmin test since KIP-894
[ https://issues.apache.org/jira/browse/KAFKA-14905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-14905: --- Assignee: Greg Harris > Failing tests in MM2 ForwardingAdmin test since KIP-894 > --- > > Key: KAFKA-14905 > URL: https://issues.apache.org/jira/browse/KAFKA-14905 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.5.0 > > > There are three tests which are consistently failing in > MirrorConnectorsWithCustomForwardingAdminIntegrationTest since the merge of > KIP-894 in KAFKA-14420: > * testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() > * testCreatePartitionsUseProvidedForwardingAdmin() > * testSyncTopicConfigUseProvidedForwardingAdmin() > {noformat} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Topic: primary.test-topic-1 didn't get created in the FakeLocalMetadataStore > ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:214) > (similar) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:243) > (similar) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:280){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14905) Failing tests in MM2 ForwardingAdmin test since KIP-894
[ https://issues.apache.org/jira/browse/KAFKA-14905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14905: Labels: flaky-test (was: ) > Failing tests in MM2 ForwardingAdmin test since KIP-894 > --- > > Key: KAFKA-14905 > URL: https://issues.apache.org/jira/browse/KAFKA-14905 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Labels: flaky-test > Fix For: 3.5.0 > > > There are three tests which are consistently failing in > MirrorConnectorsWithCustomForwardingAdminIntegrationTest since the merge of > KIP-894 in KAFKA-14420: > * testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() > * testCreatePartitionsUseProvidedForwardingAdmin() > * testSyncTopicConfigUseProvidedForwardingAdmin() > {noformat} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Topic: primary.test-topic-1 didn't get created in the FakeLocalMetadataStore > ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at > app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:214) > (similar) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:243) > (similar) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:324) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:280){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1170599794 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); +resetStateAndGeneration("member missed the rebalance", true); Review Comment: Offline discussion with @hachikuji - it seems like what we want is to revoke the old partitions but resend these partitions on the subsequent join. RN, join only sends out the assigned partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13575: KAFKA-14905: Intercept incrementalAlterConfigs in MM2 ForwardingAdminTest
gharris1727 commented on PR #13575: URL: https://github.com/apache/kafka/pull/13575#issuecomment-1513825281 Okay, upon further investigation it appears that this is technically a flakey test failure, it just nearly always fails in CI due to CPU load. The core reason for the failure is that the FakeForwardingAdminWithLocalMetadata has a fixed 1-second timeout for each operation. This has the effect that each of the operations that the FakeForwardingAdminWithLocalMetadata intercepts can succeed in Kafka, and the connector will know it has succeeded, but the FakeLocalMetadataStore will not be updated, which can cause assertions to fail despite the connector satisfying the condition described by the test name. The reason for multiple distinct tests all regressing at once, and all passing locally appears to be due to the _tests not distinguishing between the different admin client operations, and conflating one operation for another_. This meant that before the regression, the tests which created the topics and created partitions actually were relying on the config sync to pass. After the regression, the tests which created partitions and synced configs passed locally because the topic creation had a good success rate with low CPU load. With this knowledge, I think that this implementation of the FakeLocalMetadataStore is fundamentally flawed, and these tests are ultimately not performing the assertions that they should be. They can not reliably detect regressions and aren't really providing any value as-is. I think in this PR i'll apply a tactical fix to get the tests to pass (either raise or remove the timeout, or mark the tests as ignored) and follow-up after the upcoming release to rewrite these tests to actually test what they say they do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #13608: KAFKA-14844: Include check transaction is still ongoing right before append
jolshan opened a new pull request, #13608: URL: https://github.com/apache/kafka/pull/13608 We will need to pick up the changes in KAFKA-14916 (right now we assume producer ID is shared in all batches), but wanted to get a WIP draft out for general ideas. Will need to add the commented section in analyzeAndValidateProducerState and tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13589: MINOR: rename internal FK-join processor classes
mjsax commented on PR #13589: URL: https://github.com/apache/kafka/pull/13589#issuecomment-1513693392 Merged to `trunk` and cherry-picked to `3.5` and `3.4` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan commented on PR #13607: URL: https://github.com/apache/kafka/pull/13607#issuecomment-1513675548 I already see the checkstyle issues created by my IDE so I will fix in a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan opened a new pull request, #13607: URL: https://github.com/apache/kafka/pull/13607 Adds validation to ensure all producer IDs in a transactional/idempotent produce request are the same. Also modifies verification to only add a partition to verify if it is transactional. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #13588: [MINOR]: Fixing gradle build during compileScala and compileTestScala
chia7712 merged PR #13588: URL: https://github.com/apache/kafka/pull/13588 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13606: KAFKA-14918 Only send controller RPCs to migrating ZK brokers
cmccabe commented on code in PR #13606: URL: https://github.com/apache/kafka/pull/13606#discussion_r1170454667 ## core/src/main/scala/kafka/migration/MigrationPropagator.scala: ## @@ -70,14 +70,24 @@ class MigrationPropagator( override def publishMetadata(image: MetadataImage): Unit = { val oldImage = _image -val addedBrokers = new util.HashSet[Integer](image.cluster().brokers().keySet()) -addedBrokers.removeAll(oldImage.cluster().brokers().keySet()) -val removedBrokers = new util.HashSet[Integer](oldImage.cluster().brokers().keySet()) -removedBrokers.removeAll(image.cluster().brokers().keySet()) - -removedBrokers.asScala.foreach(id => channelManager.removeBroker(id)) -addedBrokers.asScala.foreach(id => - channelManager.addBroker(Broker.fromBrokerRegistration(image.cluster().broker(id +val prevBrokers = oldImage.cluster().brokers().values().asScala + .filter(_.isMigratingZkBroker) + .filterNot(_.fenced) + .map(Broker.fromBrokerRegistration) + .toSet + +val aliveBrokers = image.cluster().brokers().values().asScala + .filter(_.isMigratingZkBroker) + .filterNot(_.fenced) + .map(Broker.fromBrokerRegistration) + .toSet + +val addedBrokers = aliveBrokers -- prevBrokers +val removedBrokers = prevBrokers -- aliveBrokers + +stateChangeLogger.logger.debug(s"Adding brokers $addedBrokers, removing brokers $removedBrokers.") Review Comment: can we make this INFO and only do it if addedBrokers or removedBrokers is non-empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers
[ https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14918: - Fix Version/s: 3.5.0 > KRaft controller sending ZK controller RPCs to KRaft brokers > > > Key: KAFKA-14918 > URL: https://issues.apache.org/jira/browse/KAFKA-14918 > Project: Kafka > Issue Type: Sub-task >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.5.0 > > > During the migration, when upgrading a ZK broker to KRaft, the controller is > incorrectly sending UpdateMetadata requests to the KRaft controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers
David Arthur created KAFKA-14918: Summary: KRaft controller sending ZK controller RPCs to KRaft brokers Key: KAFKA-14918 URL: https://issues.apache.org/jira/browse/KAFKA-14918 Project: Kafka Issue Type: Sub-task Reporter: David Arthur Assignee: David Arthur During the migration, when upgrading a ZK broker to KRaft, the controller is incorrectly sending UpdateMetadata requests to the KRaft controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver
dimitarndimitrov commented on code in PR #13432: URL: https://github.com/apache/kafka/pull/13432#discussion_r1170437027 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java: ## @@ -260,12 +261,29 @@ public void onFailure( .filter(future.lookupKeys()::contains) .collect(Collectors.toSet()); retryLookup(keysToUnmap); +} else if (t instanceof UnsupportedVersionException) { Review Comment: Done. Also added a check that `UnsupportedVersionException` during the lookup stage causes a failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13589: MINOR: rename internal FK-join processor classes
mjsax merged PR #13589: URL: https://github.com/apache/kafka/pull/13589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] emissionnebula commented on pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures
emissionnebula commented on PR #13437: URL: https://github.com/apache/kafka/pull/13437#issuecomment-1513618207 A lot of [tests](https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-13437/runs/8/log/?start=0) related to Connect seems to be failing. But those are unrelated. I could see a similar set of tests failing in [other PRs](https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-13595/runs/3/log/?start=0) as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170357303 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).ad
[GitHub] [kafka] jsancio commented on a diff in pull request #13540: MINOR: improve QuorumController logging
jsancio commented on code in PR #13540: URL: https://github.com/apache/kafka/pull/13540#discussion_r1170345743 ## metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java: ## @@ -65,6 +81,7 @@ void failAll(Exception exception) { while (iter.hasNext()) { Entry> entry = iter.next(); for (DeferredEvent event : entry.getValue()) { +log.info("failAll({}): failing {}.", exception.getClass().getSimpleName(), event); Review Comment: I see. We don't log the stacktrace because `failAll` is only called with `NotControllerException`. Should we change the signature of this method to `void failAll(ApiException)`? ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -977,16 +969,8 @@ public void handleSnapshot(SnapshotReader reader) { long offset = batch.lastOffset(); List messages = batch.records(); -if (log.isDebugEnabled()) { -if (log.isTraceEnabled()) { -log.trace("Replaying snapshot ({}) batch with last offset of {}: {}", -reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString). -collect(Collectors.joining(", "))); -} else { -log.debug("Replaying snapshot ({}) batch with last offset of {}", -reader.snapshotId(), offset); -} -} +log.debug("Replaying snapshot {} batch with last offset of {}", +reader.snapshotId(), offset); Review Comment: Do you want to use `snapshotName` here to make it consistent with the rest of the log messages? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #13605: KAFKA-____: implement assign()
philipnee opened a new pull request, #13605: URL: https://github.com/apache/kafka/pull/13605 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint
wcarlson5 commented on code in PR #13533: URL: https://github.com/apache/kafka/pull/13533#discussion_r1170320445 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java: ## @@ -104,33 +104,40 @@ public byte[] serialize(final String topic, final Headers headers, final Change< final int oldDataLength = oldValueIsNotNull ? oldData.length : 0; // The serialization format is: -// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} -// {BYTE_ARRAY newValue}{BYTE newOldFlag=1} -// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} -final ByteBuffer buf; +// {BYTE_ARRAY oldValue}{BYTE encodingFlag=0} +// {BYTE_ARRAY newValue}{BYTE encodingFlag=1} +// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2} Review Comment: UINT32 or VARINT? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java: ## @@ -141,15 +141,15 @@ private static byte[] serializeVersions3Through5(final String topic, final Chang final int oldDataLength = oldValueIsNotNull ? oldData.length : 0; // The serialization format is: -// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} -// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} -// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} +// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3} +// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4} +// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5} final ByteBuffer buf; final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0; if (newValueIsNotNull && oldValueIsNotNull) { -final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; +final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE; Review Comment: You changed the `NEW_OLD_FLAG_SIZE` elsewhere to `ENCODING_FLAG_SIZE` can we change that here too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170322124 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1170301813 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -207,35 +234,38 @@ class RPCProducerIdManager(brokerId: Int, }) } + // Visible for testing private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = { -requestInFlight.set(false) val data = response.data +var successfulResponse = false Errors.forCode(data.errorCode()) match { case Errors.NONE => debug(s"Got next producer ID block from controller $data") // Do some sanity checks on the response -if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) { - nextProducerIdBlock.put(Failure(new KafkaException( -s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data"))) +if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) { + error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data") } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) { - nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data"))) + error(s"Producer ID block includes invalid ID range: $data") } else { - nextProducerIdBlock.put( -Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen( + nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen())) + successfulResponse = true } case Errors.STALE_BROKER_EPOCH => -warn("Our broker epoch was stale, trying again.") -maybeRequestNextBlock() +warn("Our broker currentBlockCount was stale, trying again.") case Errors.BROKER_ID_NOT_REGISTERED => warn("Our broker ID is not yet known by the controller, trying again.") -maybeRequestNextBlock() case e: Errors => -warn("Had an unknown error from the controller, giving up.") -nextProducerIdBlock.put(Failure(e.exception())) +error(s"Had an unknown error from the controller: ${e.exception}") Review Comment: nit: maybe we can rephrase this message a little for clarity ```scala error(s"Received an unexpected error code from the controller: $e") ``` ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val shouldBackoff = new AtomicBoolean(false) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first bloc
[GitHub] [kafka] cmccabe closed pull request #13590: Scram kraft update user scram credential record
cmccabe closed pull request #13590: Scram kraft update user scram credential record URL: https://github.com/apache/kafka/pull/13590 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13513: KAFKA-14881: Rework UserScramCredentialRecord
cmccabe merged PR #13513: URL: https://github.com/apache/kafka/pull/13513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1170273242 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes +? firstBatch.sizeInBytes() : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatch.sizeInBytes(); + +if (remainingBytes > 0) { +// input stream is read till (startPos - 1) while getting the batch of records earlier. +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer)); Review Comment: Good catch, addressed it in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1168261768 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1243,6 +1327,33 @@ class ReplicaManager(val config: KafkaConfig, result } + def createLogReadResult(highWatermark: Long, Review Comment: `createLogReadResult(e: Throwable)` can not be private as it is used in `DelayedRemoteFetch`. But this method can be used. It is going to be used in test classes that we are going to add in this PR or followup PR. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); Review Comment: afaik, `lastFetchedEpoch` is the epoch of the last fetched record. That can be different from the fetch offset’s epoch. We should find the respective epoch for the target offset and use that to find the remote log segment metadata. ## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; Review Comment: `RemoteLogReader` can not be moved to storage module as it currently depends on `RemoteLogManager`. I will move along with `RemoteLogManager` later. `RemoteLogReadResult` and `RemoteStorageThreadPool` are moved to storage module. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: I did not understand the comment here. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDef
[GitHub] [kafka] k-wall commented on pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
k-wall commented on PR #13572: URL: https://github.com/apache/kafka/pull/13572#issuecomment-1513428217 @divijvaidya thanks for the review feedback, much appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] k-wall commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
k-wall commented on code in PR #13572: URL: https://github.com/apache/kafka/pull/13572#discussion_r1170252886 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") +testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") +testProps.put("control.plane.listener.name", "CONTROL_PLANE") +val config = KafkaConfig.fromProps(testProps) +val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) +val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel Review Comment: Test renamed and test now checks both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] k-wall commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
k-wall commented on code in PR #13572: URL: https://github.com/apache/kafka/pull/13572#discussion_r1170252886 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") +testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") +testProps.put("control.plane.listener.name", "CONTROL_PLANE") +val config = KafkaConfig.fromProps(testProps) +val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) +val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel Review Comment: Comment added documenting use of ephemerals. ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") Review Comment: Comment added documenting use of ephemerals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] k-wall commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
k-wall commented on code in PR #13572: URL: https://github.com/apache/kafka/pull/13572#discussion_r1170251661 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") +testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") +testProps.put("control.plane.listener.name", "CONTROL_PLANE") +val config = KafkaConfig.fromProps(testProps) +val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) +val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel +verifySocketUsesReuseAddress(channel) +shutdownServerAndMetrics(testServer) Review Comment: Now using try/finally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1170250900 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: Were we expecting to use this code path when the IBP is less than 2.8? I guess I assumed that the IBP would be higher. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13582: MINOR: Fix lossy conversions flagged by Java 20
divijvaidya commented on code in PR #13582: URL: https://github.com/apache/kafka/pull/13582#discussion_r1170245494 ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ## @@ -434,7 +434,7 @@ private static byte computeAttributes(CompressionType type, TimestampType timest if (isControl) attributes |= CONTROL_FLAG_MASK; if (type.id > 0) -attributes |= COMPRESSION_CODEC_MASK & type.id; +attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id); Review Comment: I expected type promotion to a common ancestor for cases where overflow is expected such as multiplication but didn't expect it for `&` bit operation. Nevertheless TIL! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
divijvaidya commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1170238988 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,28 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * If the maximum offset beyond index, appended to the log section, it throws LogSegmentOffsetOverflowException + */ + @ParameterizedTest + @CsvSource(Array( Review Comment: Note that baseOffset can be `Long` but `largestOffset - baseOffset` should be <= `Integer.MaxValue`. This case is missing from our test here. Could we add the following test cases as well: baseOffset is a number > Integer.MaxValue, largestOffset is a number > Integer.MaxValue, such that `largestOffset - baseOffset` > Integer.MaxValue. This will throw an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13572: KAFKA-14908: Set setReuseAddress on the kafka server socket
divijvaidya commented on code in PR #13572: URL: https://github.com/apache/kafka/pull/13572#discussion_r1170213078 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") Review Comment: please add a comment here. We are using 0 so that OS can choose to associated any available port here. ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") +testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") +testProps.put("control.plane.listener.name", "CONTROL_PLANE") +val config = KafkaConfig.fromProps(testProps) +val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) +val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel +verifySocketUsesReuseAddress(channel) +shutdownServerAndMetrics(testServer) Review Comment: perhaps do this in try/finally so that the resources are cleaned even if the test fails with an exception. ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { +val acceptor = server.dataPlaneAcceptor(listener) +val channel = acceptor.get.serverChannel +verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { +shutdownServerAndMetrics(server) +val testProps = new Properties +testProps ++= props +testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") +testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") +testProps.put("control.plane.listener.name", "CONTROL_PLANE") +val config = KafkaConfig.fromProps(testProps) +val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) +val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel Review Comment: since we are starting up two listeners, could we verify reuse address for both please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1170149228 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -266,7 +266,7 @@ public synchronized TransactionalRequestResult beginAbort() { return handleCachedTransactionRequestResult(() -> { if (currentState != State.ABORTABLE_ERROR) maybeFailWithError(); -transitionTo(State.ABORTING_TRANSACTION); +transitionTo(State.ABORTING_TRANSACTION, null, true); Review Comment: there is a call chain where the Sender calls beginAbort on producer close - do we want to throw there? I'm not sure if that one qualifies as a "user-direct" action -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1170141463 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -968,13 +968,31 @@ private void transitionTo(State target) { } private void transitionTo(State target, RuntimeException error) { +transitionTo(target, error, false); Review Comment: I think changing the default behavior to not throw can cause issues in some calls: 1. TransactionManager.InitProducerIdHandler#handleResponse on line 1303 - lastError is explicitly set to null (which shouldn't be done at all, as transitionTo already does that if the state transition is valid), which will clear the latest error. I think to make this work, that lastError = null should be removed from line 1303 2. This is a call chain where we transition on direct user action, shouldn't this be throwing? KafkaProducer.send -> KafkaProducer.doSend -> maybeTransitionToErrorState -> transitionToAbortableError -> transitionTo 3. In TransactionManager.TxnOffsetCommitHandler#handleResponse, there are multiple ``` abortableError(...); break; ``` blocks. If abortableError does not throw on invalid state transition anymore, the txn commit will be retried, even when in a failed state, which doesn't seem correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes
clolov commented on code in PR #13514: URL: https://github.com/apache/kafka/pull/13514#discussion_r1170144013 ## examples/src/main/java/kafka/examples/Utils.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.examples; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +public class Utils { +private Utils() { +} + +public static void printHelp(String message, Object... args) { +System.out.println(format(message, args)); +} + +public static void printOut(String message, Object... args) { +System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args)); +} + +public static void printErr(String message, Object... args) { +System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args)); +} + +public static void maybePrintRecord(long numRecords, ConsumerRecord record) { +maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset()); +} + +public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) { +maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset()); +} + +private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) { +// we only print 10 records when there are 20 or more to send Review Comment: Ah, yes, my bad, I had to put pen to paper to figure out that this does indeed only start printing 10 once we go higher than 20. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12607: URL: https://github.com/apache/kafka/pull/12607#issuecomment-1513271209 Okay, this makes sense to me. I will aim to start opening PRs in the same manner as yours in the upcoming days and let's see where we go! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13604: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim opened a new pull request, #13604: URL: https://github.com/apache/kafka/pull/13604 …-915, Part-2) (#13526) This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields. Reviewers: David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13603: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim opened a new pull request, #13603: URL: https://github.com/apache/kafka/pull/13603 …-915, Part-2) (#13526) This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields. Reviewers: David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13529: KAFKA-14133: Migrate topology builder mock in TaskManagerTest to mockito
clolov commented on code in PR #13529: URL: https://github.com/apache/kafka/pull/13529#discussion_r1170120571 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2895,10 +2871,8 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap(.andReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10)); expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap(.andReturn(Collections.emptySet()); -topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p0)), anyString()); Review Comment: Got it, okay, this makes a lot of sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13602: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim opened a new pull request, #13602: URL: https://github.com/apache/kafka/pull/13602 …-915, Part-2) (#13526) This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields. Reviewers: David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13601: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim opened a new pull request, #13601: URL: https://github.com/apache/kafka/pull/13601 …-915, Part-2) (#13526) This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields. Reviewers: David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1170098942 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); +resetStateAndGeneration("member missed the rebalance", true); Review Comment: Added to my previous comment, I think you will need to set the `needsOnJoinPrepare` to true to go through the revocation as pointed out here: https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L821 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); +resetStateAndGeneration("member missed the rebalance", true); Review Comment: Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm thinking is this: https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507-L511 The gist is: on one of the 4 exception thrown in join/sync group, it should immediately re-send the join request. Are you thinking about the how client handles the illegal generation error? I think it is only being thrown during sync group and heartbeat. So I think by just resetting the generation shouldn't immediately causes revocation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance
philipnee commented on code in PR #13550: URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); +resetStateAndGeneration("member missed the rebalance", true); Review Comment: Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm thinking is this: https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507 The gist is: on one of the 4 exception thrown in join/sync group, it should immediately re-send the join request. Are you thinking about the how client handles the illegal generation error? I think it is only being thrown during sync group and heartbeat. So I think by just resetting the generation shouldn't immediately causes revocation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13600: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…
jeffkbkim opened a new pull request, #13600: URL: https://github.com/apache/kafka/pull/13600 …-915, Part-2) (#13526) This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields. Reviewers: David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6
mimaison commented on PR #11478: URL: https://github.com/apache/kafka/pull/11478#issuecomment-1513201845 Hi @jlprat, the point of tests is also to ensure future changes don't break this feature. You're more familiar with this feature than me, if you think the unit tests are enough, you can merge the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14869) txn and group coordinator downgrade foundation
[ https://issues.apache.org/jira/browse/KAFKA-14869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-14869: --- Fix Version/s: 3.5.0 > txn and group coordinator downgrade foundation > -- > > Key: KAFKA-14869 > URL: https://issues.apache.org/jira/browse/KAFKA-14869 > Project: Kafka > Issue Type: Task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > Fix For: 3.5.0 > > > Implement proposed changes in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation] > > - ignore unknown record types > - bump Value type records in __consumer_offsets and __transaction_state > topics to a flexible version > - serialize with highest non-flexible version (gated by IBP) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1170068205 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val blockCount = new AtomicLong(0) Review Comment: updated to use requestInFlight to fence -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)
dajac commented on PR #13526: URL: https://github.com/apache/kafka/pull/13526#issuecomment-1513180068 Merged to trunk and 3.5. We need to open PRs for the other branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13526: KAFKA-14869: Bump coordinator Value records to flexible versions (KIP-915, Part-2)
dajac merged PR #13526: URL: https://github.com/apache/kafka/pull/13526 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13582: MINOR: Fix lossy conversions flagged by Java 20
ijuma commented on code in PR #13582: URL: https://github.com/apache/kafka/pull/13582#discussion_r1170050264 ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ## @@ -434,7 +434,7 @@ private static byte computeAttributes(CompressionType type, TimestampType timest if (isControl) attributes |= CONTROL_FLAG_MASK; if (type.id > 0) -attributes |= COMPRESSION_CODEC_MASK & type.id; +attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id); Review Comment: Unfortunately, the answer is yes: https://docs.oracle.com/javase/specs/jls/se8/html/jls-5.html#jls-5.6.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14735) Improve KRaft metadata image change performance at high topic counts
[ https://issues.apache.org/jira/browse/KAFKA-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino resolved KAFKA-14735. --- Resolution: Fixed > Improve KRaft metadata image change performance at high topic counts > > > Key: KAFKA-14735 > URL: https://issues.apache.org/jira/browse/KAFKA-14735 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 3.6.0 > > > Performance of KRaft metadata image changes is currently O(<# of topics in > cluster>). This means the amount of time it takes to create just a *single* > topic scales linearly with the number of topics in the entire cluster. This > impact both controllers and brokers because both use the metadata image to > represent the KRaft metadata log. The performance of these changes should > scale with the number of topics being changed -- so creating a single topic > should perform similarly regardless of the number of topics in the cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver
dimitarndimitrov commented on code in PR #13432: URL: https://github.com/apache/kafka/pull/13432#discussion_r1170024821 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.ListOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.OffsetSpec.TimestampSpec; +import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public final class ListOffsetsHandler extends Batched { + +private final Map offsetSpecsByPartition; +private final ListOffsetsOptions options; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public ListOffsetsHandler( +Map offsetSpecsByPartition, +ListOffsetsOptions options, +LogContext logContext +) { +this.offsetSpecsByPartition = offsetSpecsByPartition; +this.options = options; +this.log = logContext.logger(ListOffsetsHandler.class); +this.lookupStrategy = new PartitionLeaderStrategy(logContext); +} + +@Override +public String apiName() { +return "listOffsets"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return this.lookupStrategy; +} + +@Override +ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { +Map topicsByName = CollectionUtils.groupPartitionsByTopic( +keys, +topicName -> new ListOffsetsTopic().setName(topicName), +(listOffsetsTopic, partitionId) -> { +TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId); +OffsetSpec offsetSpec = offsetSpecsByPartition.get(topicPartition); +long offsetQuery = getOffsetFromSpec(offsetSpec); +listOffsetsTopic.partitions().add( +new ListOffsetsPartition() +.setPartitionIndex(partitionId) +.setTimestamp(offsetQuery)); +}); +boolean supportsMaxTimestamp = keys +.stream() +.anyMatch(key -> getOffsetFromSpec(offsetSpecsByPartition.get(key)) == ListOffsetsRequest.MAX_TIMESTAMP); + +return ListOffsetsRequest.Builder +.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp) +.setTargetTimes(new ArrayList<>(topicsByName.values())); +} + +@Override +public ApiResult handleResponse( +Node broker, +Set keys, +AbstractResponse abstractResponse +) { +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170021475 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).ad
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170017991 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsA, new HashMap<>())); +// Consumer B +List subscribedTopicsB = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); +members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopicsB, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1))); +expectedAssignment.computeIfAbsent(topic1Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(2))); +// Topic 3 Partitions Assignment +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).add(new HashSet<>(Collections.singletonList(0))); +expectedAssignment.computeIfAbsent(topic3Uuid, k -> new HashSet<>()).ad
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170014726 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +subscribedTopics.add(topic2Uuid); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 Partitions +// Topics +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); +// Members +Map members = new HashMap<>(); +// Consumer A +List subscribedTopicsA = new ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid)); Review Comment: nit: Is `new ArrayList<>` necessary here? There are many other cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170013892 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = new HashMap<>(); +List subscribedTopics = new ArrayList<>(); +members.computeIfAbsent(consumerA, k -> new AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new HashMap<>())); Review Comment: nit: Let's format such line as follow: ``` new AssignmentMemberSpec( Optional.empty(), Optional.empty(), Collections.emptyList(), Collections.emptyMap() ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dimitarndimitrov commented on a diff in pull request #13432: KAFKA-14821 Implement the listOffsets API with AdminApiDriver
dimitarndimitrov commented on code in PR #13432: URL: https://github.com/apache/kafka/pull/13432#discussion_r1170012160 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.ListOffsetsOptions; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.CollectionUtils; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public final class ListOffsetsHandler extends Batched { + +private final Map offsetTimestampsByPartition; +private final ListOffsetsOptions options; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public ListOffsetsHandler( +Map offsetTimestampsByPartition, +ListOffsetsOptions options, +LogContext logContext +) { +this.offsetTimestampsByPartition = offsetTimestampsByPartition; +this.options = options; +this.log = logContext.logger(ListOffsetsHandler.class); +this.lookupStrategy = new PartitionLeaderStrategy(logContext); +} + +@Override +public String apiName() { +return "listOffsets"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return this.lookupStrategy; +} + +@Override +ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { +Map topicsByName = CollectionUtils.groupPartitionsByTopic( +keys, +topicName -> new ListOffsetsTopic().setName(topicName), +(listOffsetsTopic, partitionId) -> { +TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId); +long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition); +listOffsetsTopic.partitions().add( +new ListOffsetsPartition() +.setPartitionIndex(partitionId) +.setTimestamp(offsetTimestamp)); +}); +boolean supportsMaxTimestamp = keys +.stream() +.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP); + +return ListOffsetsRequest.Builder +.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp) +.setTargetTimes(new ArrayList<>(topicsByName.values())); +} + +@Override +public ApiResult handleResponse( +Node broker, +Set keys, +AbstractResponse abstractResponse +) { +ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse; +Map completed = new HashMap<>(); +Map failed = new HashMap<>(); +List
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170010343 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); Review Comment: nit: Whenever possible, let's use `Collections.singletonMap`, `Collections.emptyMap`, `Collection.emptyList`, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170007516 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are as follows: + * + * Each member must get at least one partition for every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + *This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + *Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much as their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + * + * + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * Add members from the potentiallyUnfilled list to the Unfilled list if they haven't met the total required quota i.e. minimum number of partitions per member + 1 (if member is designated to receive one of the excess partitions) + * Generate a list of unassigned partitions by calculating the difference between total partitions and already assigned (sticky) partitions + * Iterate through unfilled members and assign partitions from the unassigned partitions + * + * + * + */ +public class RangeAssignor implements PartitionAssignor { + +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +static class RemainingAssignmentsForMember { +private final String memberId; +private final Integer remaining; + +public RemainingAssignmentsForMember(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +public String memberId() { +return memberId; +} + +public Integer remaining() { +return remaining; +} + +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); +} else { +log.info(memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +private Map> get
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1170005574 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are as follows: + * + * Each member must get at least one partition for every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + *This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + *Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much as their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + * + * + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * Add members from the potentiallyUnfilled list to the Unfilled list if they haven't met the total required quota i.e. minimum number of partitions per member + 1 (if member is designated to receive one of the excess partitions) + * Generate a list of unassigned partitions by calculating the difference between total partitions and already assigned (sticky) partitions + * Iterate through unfilled members and assign partitions from the unassigned partitions + * + * + * + */ +public class RangeAssignor implements PartitionAssignor { + +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +static class RemainingAssignmentsForMember { +private final String memberId; +private final Integer remaining; + +public RemainingAssignmentsForMember(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +public String memberId() { +return memberId; +} + +public Integer remaining() { +return remaining; +} + +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); +} else { +log.info(memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +private Map> get
[GitHub] [kafka] jeffkbkim opened a new pull request, #13599: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…
jeffkbkim opened a new pull request, #13599: URL: https://github.com/apache/kafka/pull/13599 …art-1) (#13511) This patch implemented the first part of KIP-915. It updates the group coordinator and the transaction coordinator to ignores unknown record types while loading their respective state from the partitions. This allows downgrades from future versions that will include new record types. Reviewers: Alexandre Dupriez , David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13598: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, P…
jeffkbkim opened a new pull request, #13598: URL: https://github.com/apache/kafka/pull/13598 …art-1) (#13511) This patch implemented the first part of KIP-915. It updates the group coordinator and the transaction coordinator to ignores unknown record types while loading their respective state from the partitions. This allows downgrades from future versions that will include new record types. Reviewers: Alexandre Dupriez , David Jacot *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org