[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jeffkbkim commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259142145 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +764,121 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +// Cancel all the timers of the member. +cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId()); +cancelConsumerGroupRevocationTimeout(group.groupId(), member.memberId()); + +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING || +member.memberEpoch() != expectedMemberEpoch) { +log.debug("[GroupId " + groupId + "] Ignoring revocation timeout for " + memberId + " because the member " + +"state does not match the expected state."); +return Collections.emptyList(); +} + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"it failed to revoke partitions within {}ms.", revocationTimeoutMs); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +}
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
rreddy-22 commented on code in PR #13920: URL: https://github.com/apache/kafka/pull/13920#discussion_r1259195095 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -194,6 +193,7 @@ private boolean allSubscriptionsEqual(Set allTopics, otherConsumer, otherMemberGeneration, tp, otherMemberGeneration); +allPreviousPartitionsToOwner.put(tp, otherConsumer); Review Comment: So in line 161 we directly added the current consumer to the map allPreviousPartitionsToOwner. And if we choose to keep that line we should remove the put in the first two conditions in lines 175 and 180 and we can keep the change you put. However, I feel like this would be redundant since we're replacing the otherMember and adding it back if the conditions aren't favorable. My suggestion would be to replace line 161 with get instead of put and then keep the rest of the code as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1259168414 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when Review Comment: > Well the rationale it the same that EOS waits indefinitely for the commits to happen Prior to this change, we just waited for the producer transaction commit to complete since we didn't care about the writes to the secondary store (and the transaction would contain all the data records as well as offset records written to the connector's primary store). However, that would be bounded by the producer's `transaction.timeout.ms` right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] eziosudo commented on pull request #13982: KAFKA-15159: upgrade minor dependencies
eziosudo commented on PR #13982: URL: https://github.com/apache/kafka/pull/13982#issuecomment-1630095390 > scalaCollectionCompat has a new version `2.11`, did we consider that? Yes, just found the release note here, seems harmless. https://github.com/scala/scala-collection-compat/releases/tag/v2.11.0 By the way, scalaLogging also has a new version '3.9.5', but the release notes didn't update, Do you think we can upgrade it as well ? https://github.com/lightbend-labs/scala-logging/releases https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging https://github.com/apache/kafka/assets/54128896/c6623dd2-ac33-477c-b203-522295f05213;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1259161121 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when +// tombstone records fail to be written to the secondary store. Note that while commitTransaction +// already waits for all records to be sent and ack'ed, in this case we do need to add an explicit +// blocking call. In case EOS is disabled, we wait for the same duration as `offset.commit.timeout.ms` +// and throw that exception which would allow the offset commit to fail. +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (InterruptedException e) { +log.warn("{} Flush of tombstone(s)-containing offsets to secondary store interrupted, cancelling", this); Review Comment: Thanks, that makes sense ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when +// tombstone records fail to be written to the secondary store. Note that while commitTransaction +// already waits for all records to be sent and ack'ed, in this case we do need to add an explicit +// blocking call. In case EOS is disabled, we wait for the same duration as `offset.commit.timeout.ms` +// and throw that exception which would allow the offset commit to fail. +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (InterruptedException e) { +log.warn("{} Flush of tombstone(s)-containing offsets to secondary store interrupted, cancelling", this); Review Comment: Thanks, that makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1259160336 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); Review Comment: Aren't we still making explicit null checks here - https://github.com/apache/kafka/pull/13801/files#diff-0014cfa4c32bfde215cb10bee987e997d5182815fcf1a1245539375519f83f3dR325-R326? 樂 I'm fine with retaining it though, this isn't a blocking comment. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); Review Comment: Aren't we still making explicit null checks here - https://github.com/apache/kafka/pull/13801/files#diff-0014cfa4c32bfde215cb10bee987e997d5182815fcf1a1245539375519f83f3dR325-R326? 樂 I'm fine with retaining it though, this isn't a blocking comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15139) Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`
[ https://issues.apache.org/jira/browse/KAFKA-15139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi resolved KAFKA-15139. Resolution: Fixed > Optimize the performance of `Set.removeAll(List)` in > `MirrorCheckpointConnector` > > > Key: KAFKA-15139 > URL: https://issues.apache.org/jira/browse/KAFKA-15139 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > This is the hint of `removeAll` method in `Set`: > _This implementation determines which is the smaller of this set and the > specified collection, by invoking the size method on each. If this set has > fewer elements, then the implementation iterates over this set, checking each > element returned by the iterator in turn to see if it is contained in the > specified collection. If it is so contained, it is removed from this set with > the iterator's remove method. If the specified collection has fewer elements, > then the implementation iterates over the specified collection, removing from > this set each element returned by the iterator, using this set's remove > method._ > That's said, assume that _M_ is the number of elements in the set and _N_ is > the number of elements in the List, if the type of the specified collection > is `List`, and {_}M<=N{_}, then the time complexity of `removeAll` is _O(MN)_ > (because the time complexity of searching in List is {_}O(N){_}), on the > contrary, if {_}N {_}O(N){_}. > In `MirrorCheckpointConnector`, `refreshConsumerGroups` method is repeatedly > called in a daemon thread. There are two `removeAll` in this method. From a > logical point of view, when this method is called in one round, when the > number of groups in the source cluster simply increases or decreases, the two > `removeAll` execution strategies will always hit the _O(MN)_ situation > mentioned above. Therefore, it is better to change all the variables here to > Set type to avoid this "low performance". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
hudeqi commented on PR #13913: URL: https://github.com/apache/kafka/pull/13913#issuecomment-1630082775 I have added the unit test for the related ”createAcl failure“ case, thanks for the review! @C0urante -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
CalvinConfluent commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259133929 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1259130578 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + * This class is thread safe. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; +public static final String DIR_NAME = "remote-log-index-cache"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1259130451 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + * This class is thread safe. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; +public static final String DIR_NAME = "remote-log-index-cache"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps
[GitHub] [kafka] satishd commented on pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on PR #13275: URL: https://github.com/apache/kafka/pull/13275#issuecomment-1630042350 > Just a few comments related to TxnIndex potentially being optional. @jeqo There is already [KAFKA-14993](https://issues.apache.org/jira/browse/KAFKA-14993) to address that. @kamalcph was working on that. I did not want to add those changes to this PR as it is tracked separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1259126161 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + * This class is thread safe. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; +public static final String DIR_NAME = "remote-log-index-cache"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1259125780 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + * This class is thread safe. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; +public static final String DIR_NAME = "remote-log-index-cache"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1259125780 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + * This class is thread safe. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; +public static final String DIR_NAME = "remote-log-index-cache"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259115771 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -367,6 +446,155 @@ public CoordinatorResult consumerGro return result; } +public CompletableFuture sendGenericGroupJoin( +JoinGroupRequestData request +) { +return sendGenericGroupJoin(request, false); +} + +public CompletableFuture sendGenericGroupJoin( +JoinGroupRequestData request, +boolean requireKnownMemberId +) { +return sendGenericGroupJoin(request, requireKnownMemberId, false, null); +} + +public CompletableFuture sendGenericGroupJoin( +JoinGroupRequestData request, +boolean requireKnownMemberId, +boolean supportSkippingAssignment, +ExpectedGenericGroupResult expectedResult +) { +// requireKnownMemberId is true: version >= 4 +// supportSkippingAssignment is true: version >= 9 +short joinGroupVersion = 3; + +if (requireKnownMemberId) { +joinGroupVersion = 4; +if (supportSkippingAssignment) { +joinGroupVersion = ApiKeys.JOIN_GROUP.latestVersion(); +} +} + +CompletableFuture responseFuture = new CompletableFuture<>(); + +RequestContext context = new RequestContext( +new RequestHeader( +ApiKeys.JOIN_GROUP, +joinGroupVersion, +"client", +0 +), +"1", +InetAddress.getLoopbackAddress(), +KafkaPrincipal.ANONYMOUS, +ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), +SecurityProtocol.PLAINTEXT, +ClientInformation.EMPTY, +false +); + +CoordinatorResult result = groupMetadataManager.genericGroupJoin( +context, +request, +responseFuture +); + +if (expectedResult != null) { +GenericGroup group = groupMetadataManager.getOrMaybeCreateGenericGroup( +request.groupId(), +false +); + +Record groupMetadataRecord; +if (expectedResult.isNewGroup) { +groupMetadataRecord = newEmptyGroupMetadataRecord(group, MetadataVersion.latest()); +} else { +groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(group, MetadataVersion.latest()); +} + +expectedResult.records = Collections.singletonList(groupMetadataRecord); Review Comment: i'll think a bit more on this as it will require a large change in this class. one of the reasons i had it like this is that we mostly care about the responseFuture in the tests and wanted to hide the record/append future validations. The timer could also produce records which require setting things in advance. i agree it is unclean, i'll address this in the next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741819#comment-17741819 ] Ismael Juma commented on KAFKA-15175: - nio2 is not necessarily faster than nio - it's a different model. > Assess the use of nio2 asynchronous channel for KafkaConsumer > - > > Key: KAFKA-15175 > URL: https://issues.apache.org/jira/browse/KAFKA-15175 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Philip Nee >Priority: Major > > We should assess if NIO2 is appropriate to replace the current nio library > with more performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15152) Fix incorrect format specifiers when formatting string
[ https://issues.apache.org/jira/browse/KAFKA-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741814#comment-17741814 ] Phuc Hong Tran edited comment on KAFKA-15152 at 7/11/23 2:30 AM: - Hi [~divijvaidya], can I have this one assign to me? Thanks P.S: I just assigned it to myself was (Author: JIRAUSER301295): Hi [~divijvaidya], can I have this one assign to me? Thanks > Fix incorrect format specifiers when formatting string > -- > > Key: KAFKA-15152 > URL: https://issues.apache.org/jira/browse/KAFKA-15152 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Minor > Labels: newbiw > > *This is a good Jira to be picked up by first time contributors to Kafka code > base.* > The objective of this Jira is to fix incorrect formatting of string at > multiple places in the code which can cause incorrect print when used in a > different locale. > *1. FollowerState.java Line: 121* > This code uses '%s' to format long: updatedHighWatermark (declared at line > 117). This is a potential locale-sensitive handling issue. It might cause > errors in the handling and processing of the statement at line: 121. Consider > formatting this data with '%d' instead. > *2. MemoryRecordsBuilder.java Line: 441* > This code uses '%s' to format long: offset (declared at line 434), > lastOffset. This is a potential locale-sensitive handling issue. It might > cause errors in the handling and processing of the statement at line: 441. > Consider formatting this data with '%d' instead. > *3. HttpAccessTokenRetriever.java Line: 340* > This code uses '%s' to format int: MAX_RESPONSE_BODY_LENGTH, actualLength > (declared at line 338). This is a potential locale-sensitive handling issue. > It might cause errors in the handling and processing of the statement at > line: 340. Consider formatting this data with '%d' instead. > 4. *KafkaAdminClient.java Line: 1256* > This code uses '%s' to format int: correlationId (declared at line 1226). > This is a potential locale-sensitive handling issue. It might cause errors in > the handling and processing of the statement at line: 1256. Consider > formatting this data with '%d' instead. > *5 . Batch.java Line: 170* > This code uses '%s' to format int: epoch (declared at line 163). This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 170. Consider formatting > this data with '%d' instead. > *6. Batch.java Line: 207* > This code uses '%s' to format int: epoch (declared at line 200). This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 207. Consider formatting > this data with '%d' instead. > *7. BatchBuilder.java Line: 330* > This code uses '%s' to format int: 'size' expression. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 330. Consider formatting this data with > '%d' instead. > *8. CopartitionedTopicsEnforcer.java Line: 104* > This code uses '%s' to format int: numberOfPartitionsOfInternalTopic > (declared at line 99), numPartitionsToUseForRepartitionTopics (defined at > line 81). This is a potential locale-sensitive handling issue. It might cause > errors in the handling and processing of the statement at line: 104. Consider > formatting this data with '%d' instead. > > *9. RefreshingHttpsJwks.java Line: 337* > This code uses '%s' to format int: MISSING_KEY_ID_MAX_KEY_LENGTH, > actualLength (declared at line 335). This is a potential locale-sensitive > handling issue. It might cause errors in the handling and processing of the > statement at line: 337. Consider formatting this data with '%d' instead. > *10. SerializedJwt.java Line: 47* > This code uses '%s' to format int: splits.length. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 47. Consider formatting this data with > '%d' instead. > *11. ControllerResultAndOffset.java Line: 57* > This code uses '%s' to format long: offset. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 57. Consider formatting this data with > '%d' instead. > *12. ReplicatedLog.java Line: 101* > This code uses '%s' to format long: 'startOffset' expression. This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 101. Consider formatting > this data with '%d' instead. > *13. HttpAccessTokenRetriever.java Line: 275* > This code uses '%s' to
[jira] [Assigned] (KAFKA-15152) Fix incorrect format specifiers when formatting string
[ https://issues.apache.org/jira/browse/KAFKA-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-15152: -- Assignee: Phuc Hong Tran > Fix incorrect format specifiers when formatting string > -- > > Key: KAFKA-15152 > URL: https://issues.apache.org/jira/browse/KAFKA-15152 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Phuc Hong Tran >Priority: Minor > Labels: newbiw > > *This is a good Jira to be picked up by first time contributors to Kafka code > base.* > The objective of this Jira is to fix incorrect formatting of string at > multiple places in the code which can cause incorrect print when used in a > different locale. > *1. FollowerState.java Line: 121* > This code uses '%s' to format long: updatedHighWatermark (declared at line > 117). This is a potential locale-sensitive handling issue. It might cause > errors in the handling and processing of the statement at line: 121. Consider > formatting this data with '%d' instead. > *2. MemoryRecordsBuilder.java Line: 441* > This code uses '%s' to format long: offset (declared at line 434), > lastOffset. This is a potential locale-sensitive handling issue. It might > cause errors in the handling and processing of the statement at line: 441. > Consider formatting this data with '%d' instead. > *3. HttpAccessTokenRetriever.java Line: 340* > This code uses '%s' to format int: MAX_RESPONSE_BODY_LENGTH, actualLength > (declared at line 338). This is a potential locale-sensitive handling issue. > It might cause errors in the handling and processing of the statement at > line: 340. Consider formatting this data with '%d' instead. > 4. *KafkaAdminClient.java Line: 1256* > This code uses '%s' to format int: correlationId (declared at line 1226). > This is a potential locale-sensitive handling issue. It might cause errors in > the handling and processing of the statement at line: 1256. Consider > formatting this data with '%d' instead. > *5 . Batch.java Line: 170* > This code uses '%s' to format int: epoch (declared at line 163). This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 170. Consider formatting > this data with '%d' instead. > *6. Batch.java Line: 207* > This code uses '%s' to format int: epoch (declared at line 200). This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 207. Consider formatting > this data with '%d' instead. > *7. BatchBuilder.java Line: 330* > This code uses '%s' to format int: 'size' expression. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 330. Consider formatting this data with > '%d' instead. > *8. CopartitionedTopicsEnforcer.java Line: 104* > This code uses '%s' to format int: numberOfPartitionsOfInternalTopic > (declared at line 99), numPartitionsToUseForRepartitionTopics (defined at > line 81). This is a potential locale-sensitive handling issue. It might cause > errors in the handling and processing of the statement at line: 104. Consider > formatting this data with '%d' instead. > > *9. RefreshingHttpsJwks.java Line: 337* > This code uses '%s' to format int: MISSING_KEY_ID_MAX_KEY_LENGTH, > actualLength (declared at line 335). This is a potential locale-sensitive > handling issue. It might cause errors in the handling and processing of the > statement at line: 337. Consider formatting this data with '%d' instead. > *10. SerializedJwt.java Line: 47* > This code uses '%s' to format int: splits.length. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 47. Consider formatting this data with > '%d' instead. > *11. ControllerResultAndOffset.java Line: 57* > This code uses '%s' to format long: offset. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 57. Consider formatting this data with > '%d' instead. > *12. ReplicatedLog.java Line: 101* > This code uses '%s' to format long: 'startOffset' expression. This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 101. Consider formatting > this data with '%d' instead. > *13. HttpAccessTokenRetriever.java Line: 275* > This code uses '%s' to format int: responseCode (declared at line 240). This > is a potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 275. Consider formatting > this data with '%d'
[jira] [Commented] (KAFKA-15152) Fix incorrect format specifiers when formatting string
[ https://issues.apache.org/jira/browse/KAFKA-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741814#comment-17741814 ] Phuc Hong Tran commented on KAFKA-15152: Hi [~divijvaidya], can I have this one assign to me? Thanks > Fix incorrect format specifiers when formatting string > -- > > Key: KAFKA-15152 > URL: https://issues.apache.org/jira/browse/KAFKA-15152 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Priority: Minor > Labels: newbiw > > *This is a good Jira to be picked up by first time contributors to Kafka code > base.* > The objective of this Jira is to fix incorrect formatting of string at > multiple places in the code which can cause incorrect print when used in a > different locale. > *1. FollowerState.java Line: 121* > This code uses '%s' to format long: updatedHighWatermark (declared at line > 117). This is a potential locale-sensitive handling issue. It might cause > errors in the handling and processing of the statement at line: 121. Consider > formatting this data with '%d' instead. > *2. MemoryRecordsBuilder.java Line: 441* > This code uses '%s' to format long: offset (declared at line 434), > lastOffset. This is a potential locale-sensitive handling issue. It might > cause errors in the handling and processing of the statement at line: 441. > Consider formatting this data with '%d' instead. > *3. HttpAccessTokenRetriever.java Line: 340* > This code uses '%s' to format int: MAX_RESPONSE_BODY_LENGTH, actualLength > (declared at line 338). This is a potential locale-sensitive handling issue. > It might cause errors in the handling and processing of the statement at > line: 340. Consider formatting this data with '%d' instead. > 4. *KafkaAdminClient.java Line: 1256* > This code uses '%s' to format int: correlationId (declared at line 1226). > This is a potential locale-sensitive handling issue. It might cause errors in > the handling and processing of the statement at line: 1256. Consider > formatting this data with '%d' instead. > *5 . Batch.java Line: 170* > This code uses '%s' to format int: epoch (declared at line 163). This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 170. Consider formatting > this data with '%d' instead. > *6. Batch.java Line: 207* > This code uses '%s' to format int: epoch (declared at line 200). This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 207. Consider formatting > this data with '%d' instead. > *7. BatchBuilder.java Line: 330* > This code uses '%s' to format int: 'size' expression. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 330. Consider formatting this data with > '%d' instead. > *8. CopartitionedTopicsEnforcer.java Line: 104* > This code uses '%s' to format int: numberOfPartitionsOfInternalTopic > (declared at line 99), numPartitionsToUseForRepartitionTopics (defined at > line 81). This is a potential locale-sensitive handling issue. It might cause > errors in the handling and processing of the statement at line: 104. Consider > formatting this data with '%d' instead. > > *9. RefreshingHttpsJwks.java Line: 337* > This code uses '%s' to format int: MISSING_KEY_ID_MAX_KEY_LENGTH, > actualLength (declared at line 335). This is a potential locale-sensitive > handling issue. It might cause errors in the handling and processing of the > statement at line: 337. Consider formatting this data with '%d' instead. > *10. SerializedJwt.java Line: 47* > This code uses '%s' to format int: splits.length. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 47. Consider formatting this data with > '%d' instead. > *11. ControllerResultAndOffset.java Line: 57* > This code uses '%s' to format long: offset. This is a potential > locale-sensitive handling issue. It might cause errors in the handling and > processing of the statement at line: 57. Consider formatting this data with > '%d' instead. > *12. ReplicatedLog.java Line: 101* > This code uses '%s' to format long: 'startOffset' expression. This is a > potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 101. Consider formatting > this data with '%d' instead. > *13. HttpAccessTokenRetriever.java Line: 275* > This code uses '%s' to format int: responseCode (declared at line 240). This > is a potential locale-sensitive handling issue. It might cause errors in the > handling and processing of the statement at line: 275. Consider formatting
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259098972 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259098122 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259097546 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259096806 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259095013 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
hudeqi commented on code in PR #13913: URL: https://github.com/apache/kafka/pull/13913#discussion_r1259093047 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map topicConfigs) { })); } -private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +// Visible for testing +int updateTopicAcls(List bindings) { +Set addBindings = new HashSet<>(bindings); +addBindings.removeAll(knownTopicAclBindings); +int newBindCount = addBindings.size(); +if (!addBindings.isEmpty()) { +log.info("Syncing new found {} topic ACL bindings.", newBindCount); +targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); +} +})); +knownTopicAclBindings = new HashSet<>(bindings); Review Comment: nice catch ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
hudeqi commented on code in PR #13913: URL: https://github.com/apache/kafka/pull/13913#discussion_r1259093047 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map topicConfigs) { })); } -private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +// Visible for testing +int updateTopicAcls(List bindings) { +Set addBindings = new HashSet<>(bindings); +addBindings.removeAll(knownTopicAclBindings); +int newBindCount = addBindings.size(); +if (!addBindings.isEmpty()) { +log.info("Syncing new found {} topic ACL bindings.", newBindCount); +targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); +} +})); +knownTopicAclBindings = new HashSet<>(bindings); Review Comment: nice cacth ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259062674 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259059291 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259054171 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259054171 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259049772 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259047855 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() { // Verify the groups. Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertTrue(group.hasMetadataExpired(context.time.milliseconds())); }); Arrays.asList("group5").forEach(groupId -> { -ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroup group = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); assertFalse(group.hasMetadataExpired(context.time.milliseconds())); }); // Verify image. assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259040832 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -228,14 +261,21 @@ public List build(TopicsImage topicsImage) { static class GroupMetadataManagerTestContext { static class Builder { -final private Time time = new MockTime(); final private LogContext logContext = new LogContext(); final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private MetadataImage metadataImage; -private List assignors; Review Comment: we get illegal state exception if it's not initialized and since it doesn't affect the old protocol i thought it best to keep it clean -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259030830 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -271,4 +289,110 @@ public void testOnResignation() { 10 ); } + +@Test +public void testJoinGroup() { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); + +JoinGroupRequestData request = new JoinGroupRequestData() +.setGroupId("foo"); + +service.startup(() -> 1); + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("generic-group-join"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture( +new JoinGroupResponseData() +)); + +CompletableFuture responseFuture = service.joinGroup( +requestContext(ApiKeys.JOIN_GROUP), +request, +BufferSupplier.NO_CACHING +); + +assertFalse(responseFuture.isDone()); Review Comment: A successful join group request will store the response future into the member's `awaitingJoinFuture` (could also complete if the join phase completes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate URL: https://github.com/apache/kafka/pull/13989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259029305 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1043,4 +1221,1331 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { } }); } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +groups.remove(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +CoordinatorResult newGroupResult = EMPTY_RESULT; +if (isNewGroup) { +// If a group was newly created, we need to append records to
[GitHub] [kafka] kirktrue opened a new pull request, #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue opened a new pull request, #13990: URL: https://github.com/apache/kafka/pull/13990 There are a number of places in the client code where the same basic calls are made by more than one client implementation. Minor refactoring will reduce the amount of boilerplate code necessary for the client to construct its internal state. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate URL: https://github.com/apache/kafka/pull/13989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue opened a new pull request, #13989: URL: https://github.com/apache/kafka/pull/13989 There are a number of places in the client code where the same basic calls are made by more than one client implementation. Minor refactoring will reduce the amount of boilerplate code necessary for the client to construct its internal state. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1259002211 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1043,4 +1221,1331 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { } }); } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +groups.remove(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +CoordinatorResult newGroupResult = EMPTY_RESULT; +if (isNewGroup) { +// If a group was newly created, we need to append records to
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1258999406 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1043,4 +1221,1331 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { } }); } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +groups.remove(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", Review Comment: is your suggestion to iterate through all groups & members and log each member after loading a partition is complete? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] eziosudo commented on a diff in pull request #13982: KAFKA-15159: upgrade minor dependencies
eziosudo commented on code in PR #13982: URL: https://github.com/apache/kafka/pull/13982#discussion_r1258959682 ## gradle/dependencies.gradle: ## @@ -142,8 +142,8 @@ versions += [ slf4j: "1.7.36", snappy: "1.1.10.1", spotbugs: "4.7.3", - swaggerAnnotations: "2.2.8", - swaggerJaxrs2: "2.2.8", + swaggerAnnotations: "2.2.14", Review Comment: Thanks for review! I should have took it more carefully! Will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #13946: KAFKA-15139:Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`
gharris1727 merged PR #13946: URL: https://github.com/apache/kafka/pull/13946 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics
jeqo commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1258934931 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) { BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), +BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"), +BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"), +BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"), +BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"), +BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"), +BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"), Review Comment: few more naming suggestions: ```suggestion BrokerTopicStats.RemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"), BrokerTopicStats.RemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"), BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"), BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1258937169 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -579,4 +618,32 @@ public void shutdown() { Utils.closeQuietly(runtime, "coordinator runtime"); log.info("Shutdown complete."); } + +private static boolean isGroupIdNotEmpty(String groupId) { +return groupId != null && !groupId.isEmpty(); +} + +private static Errors toResponseError(Errors appendError) { Review Comment: confirmed that `storeGroup` and `storeOffsets` have different handling. this will still be shared amongst join/sync/leave group, so i'll rename this to `appendGroupMetadataErrorToResponseError`, wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13946: KAFKA-15139:Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`
gharris1727 commented on PR #13946: URL: https://github.com/apache/kafka/pull/13946#issuecomment-1629762524 Flaky test failures in the Mirror suite appear unrelated, and the tests pass locally. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1258934915 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -579,4 +618,32 @@ public void shutdown() { Utils.closeQuietly(runtime, "coordinator runtime"); log.info("Shutdown complete."); } + +private static boolean isGroupIdNotEmpty(String groupId) { +return groupId != null && !groupId.isEmpty(); +} + +private static Errors toResponseError(Errors appendError) { +switch (appendError) { +case UNKNOWN_TOPIC_OR_PARTITION: +case NOT_ENOUGH_REPLICAS: +case NOT_ENOUGH_REPLICAS_AFTER_APPEND: +return COORDINATOR_NOT_AVAILABLE; + +case NOT_LEADER_OR_FOLLOWER: +case KAFKA_STORAGE_ERROR: +return NOT_COORDINATOR; + +case REQUEST_TIMED_OUT: Review Comment: where should i look to confirm/learn this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
jeqo commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1258880361 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.String.format; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory; +import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory; + +/** + * An implementation of {@link RemoteStorageManager} which relies on the local file system to store + * offloaded log segments and associated data. + * + * Due to the consistency semantic of POSIX-compliant file systems, this remote storage provides strong + * read-after-write consistency and a segment's data can be accessed once the copy to the storage succeeded. + * + * + * In order to guarantee isolation, independence, reproducibility and consistency of unit and integration + * tests, the scope of a storage implemented by this class, and identified via the storage ID
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1258924230 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +282,32 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isGroupIdNotEmpty(request.groupId())) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleWriteOperation("generic-group-join", +topicPartitionFor(request.groupId()), +coordinator -> coordinator.genericGroupJoin(context, request, responseFuture) +).exceptionally(exception -> { +log.error("Request {} hit an unexpected exception: {}", +request, exception.getMessage()); Review Comment: this would log all errors while appending/committing and if `generateRecordsAndResponse` throws an unexpected exception. shouldn't we log them? it doesn't seem like we do for `consumerGroupHeartbeat()` -- maybe just filter out the coordinator not available / not coordinator error codes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
hachikuji commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1258915597 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition, // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null. def maybeStartTransactionVerification(producerId: Long): Object = { -leaderLogIfLocal match { +log match { Review Comment: Returning an error seems straightforward. No harm done if the broker comes leader just after since the client will retry. Given how hard the scenario was to reproduce, I'm not too concerned about impact from retries. ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition, // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null. def maybeStartTransactionVerification(producerId: Long): Object = { -leaderLogIfLocal match { +log match { Review Comment: Returning an error seems straightforward. No harm done if the broker becomes leader just after since the client will retry. Given how hard the scenario was to reproduce, I'm not too concerned about impact from retries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer
Philip Nee created KAFKA-15175: -- Summary: Assess the use of nio2 asynchronous channel for KafkaConsumer Key: KAFKA-15175 URL: https://issues.apache.org/jira/browse/KAFKA-15175 Project: Kafka Issue Type: Wish Components: consumer Reporter: Philip Nee We should assess if NIO2 is appropriate to replace the current nio library with more performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ahuang98 commented on pull request #13988: [KAFKA-15137] Log request name only in KRaftControllerChannelManager
ahuang98 commented on PR #13988: URL: https://github.com/apache/kafka/pull/13988#issuecomment-1629679221 There's probably some value in leaving in the `controllerId`, `controllerEpoch`, `brokerEpoch` as well. It won't elongate the log by much -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 opened a new pull request, #13988: [KAFKA-15137] Log request name only in KRaftControllerChannelManager
ahuang98 opened a new pull request, #13988: URL: https://github.com/apache/kafka/pull/13988 Logging the following ``` [2023-07-10 13:15:22,703] WARN [Channel manager on controller 3000]: Not sending request UpdateMetadata to broker 0, since it is offline. (kafka.controller.ControllerChannelManager:70) [2023-07-10 13:15:22,705] WARN [Channel manager on controller 3000]: Not sending request LeaderAndIsr to broker 0, since it is offline. (kafka.controller.ControllerChannelManager:70) ``` vs the entire request ``` [2023-07-10 13:09:53,928] WARN [Channel manager on controller 3000]: Not sending request (type: UpdateMetadataRequest=, controllerId=3000, controllerEpoch=3, brokerEpoch=3, partitionStates=[], liveBrokers=UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=64622, host='localhost', listener='EXTERNAL', securityProtocol=0), UpdateMetadataEndpoint(port=64621, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null, tags=[])) to broker 0, since it is offline. (kafka.controller.KRaftControllerChannelManager:70) ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
jeqo commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1258867599 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + * This class is thread safe. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; +public static final String DIR_NAME = "remote-log-index-cache"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps
[GitHub] [kafka] divijvaidya commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
divijvaidya commented on PR #13956: URL: https://github.com/apache/kafka/pull/13956#issuecomment-1629633140 @mimaison @showuon requesting your review. We have many cases of CI failing with hundreds of test failing and this thread leak is one of the reason for it. Hence, the urgency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on PR #13284: URL: https://github.com/apache/kafka/pull/13284#issuecomment-1629620630 Thank you for your patience on this one @C0urante. Appreciate it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13982: KAFKA-15159: upgrade minor dependencies
divijvaidya commented on code in PR #13982: URL: https://github.com/apache/kafka/pull/13982#discussion_r1258807856 ## gradle/dependencies.gradle: ## @@ -142,8 +142,8 @@ versions += [ slf4j: "1.7.36", snappy: "1.1.10.1", spotbugs: "4.7.3", - swaggerAnnotations: "2.2.8", - swaggerJaxrs2: "2.2.8", + swaggerAnnotations: "2.2.14", + swaggerJaxrs2: "2.2.14", Review Comment: same. required jdk 11+ https://github.com/swagger-api/swagger-core/pull/4377 ## gradle/dependencies.gradle: ## @@ -142,8 +142,8 @@ versions += [ slf4j: "1.7.36", snappy: "1.1.10.1", spotbugs: "4.7.3", - swaggerAnnotations: "2.2.8", - swaggerJaxrs2: "2.2.8", + swaggerAnnotations: "2.2.14", Review Comment: can't update this. Versions after 2.2.8 require minimum JDK 11 - https://github.com/swagger-api/swagger-core/pull/4377 Please also add a comment about this here for future folks who may want to change this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13986: MINOR: Add upgrade docs for 3.5.1
divijvaidya commented on code in PR #13986: URL: https://github.com/apache/kafka/pull/13986#discussion_r1258779004 ## docs/upgrade.html: ## @@ -19,6 +19,22 @@
[jira] [Created] (KAFKA-15174) Ensure the correct thread is executing the callbacks
Philip Nee created KAFKA-15174: -- Summary: Ensure the correct thread is executing the callbacks Key: KAFKA-15174 URL: https://issues.apache.org/jira/browse/KAFKA-15174 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee We need to add assertion tests to ensure the correct thread is executing the offset commit callbacks and rebalance callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15173: --- Labels: kip-945 (was: ) > ApplicationEventQueue and BackgroundEventQueue should be bounded > > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: kip-945 > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded
Philip Nee created KAFKA-15173: -- Summary: ApplicationEventQueue and BackgroundEventQueue should be bounded Key: KAFKA-15173 URL: https://issues.apache.org/jira/browse/KAFKA-15173 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee The async consumer uses ApplicationEventQueue and BackgroundEventQueue to facilitate message passing between the application thread and the background thread. The current implementation is boundless, which can potentially cause OOM and other performance-related issues. I think the queues need a finite bound, and we need to decide how to handle the situation when the bound is reached. In particular, I would like to answer these questions: # What should the upper limit be for both queues: Can this be a configurable, memory-based bound? Or just an arbitrary number of events as the bound. # What should happen when the application event queue is filled up? It seems like we should introduce a new exception type and notify the user that the consumer is full. # What should happen when the background event queue is filled up? This seems less likely to happen, but I imagine it could happen when the user stops polling the consumer, causing the queue to be filled. # Is it necessary to introduce a public configuration for the queue? I think initially we would select an arbitrary constant number and see the community feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Cerchie opened a new pull request, #13987: Kafka-15126: Range queries to accept null lower and upper bounds
Cerchie opened a new pull request, #13987: URL: https://github.com/apache/kafka/pull/13987 Change in response to [KIP-941](https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds). Changes line 57 in the RangeQuery class file from: ``` public static RangeQuery withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } ``` to ``` public static RangeQuery withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper)); } ``` Testing strategy: Since null values can now be entered in `RangeQuery`s in order to receive full scans, I changed the logic defining `query` starting at [line 1085](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java#L1085) in `IQv2StoreIntegrationTest.java` from: ``` final RangeQuery query; if (lower.isPresent() && upper.isPresent()) { query = RangeQuery.withRange(lower.get(), upper.get()); } else if (lower.isPresent()) { query = RangeQuery.withLowerBound(lower.get()); } else if (upper.isPresent()) { query = RangeQuery.withUpperBound(upper.get()); } else { query = RangeQuery.withNoBounds(); } ``` to ``` query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null)); ``` because different combinations of `isPresent()` in the bounds is no longer necessary. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258697658 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; + +public class MockCoordinatorTimer implements CoordinatorTimer { +public static class ScheduledTimeout { +public final String key; +public final long deadlineMs; +public final TimeoutOperation operation; + +ScheduledTimeout( +String key, +long deadlineMs, +TimeoutOperation operation +) { +this.key = key; +this.deadlineMs = deadlineMs; +this.operation = operation; +} +} + +public static class ExpiredTimeout { +public final String key; +public final List records; + +ExpiredTimeout( +String key, +List records +) { +this.key = key; +this.records = records; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +ExpiredTimeout that = (ExpiredTimeout) o; + +if (!Objects.equals(key, that.key)) return false; +return Objects.equals(records, that.records); +} + +@Override +public int hashCode() { +int result = key != null ? key.hashCode() : 0; +result = 31 * result + (records != null ? records.hashCode() : 0); +return result; +} +} + +private final Time time; + +private final Map> timeoutMap = new HashMap<>(); +private final PriorityQueue> timeoutQueue = new PriorityQueue<>( +Comparator.comparingLong(entry -> entry.deadlineMs) +); + +public MockCoordinatorTimer(Time time) { +this.time = time; +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +boolean retry, +TimeoutOperation operation +) { +cancel(key); + +long deadlineMs = time.milliseconds() + unit.toMillis(delay); +ScheduledTimeout timeout = new ScheduledTimeout<>(key, deadlineMs, operation); +timeoutQueue.add(timeout); +timeoutMap.put(key, timeout); +} + +@Override +public void cancel(String key) { +ScheduledTimeout timeout = timeoutMap.remove(key); +if (timeout != null) { +timeoutQueue.remove(timeout); +} +} + +public boolean contains(String key) { +return timeoutMap.containsKey(key); +} + +public ScheduledTimeout timeout(String key) { +return timeoutMap.get(key); +} + +public int size() { +return timeoutMap.size(); +} + +public List> poll() { Review Comment: So we don't expire timeouts until we call this poll? Also in general, can we add a few more comments about this class and it usage? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258697658 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; + +public class MockCoordinatorTimer implements CoordinatorTimer { +public static class ScheduledTimeout { +public final String key; +public final long deadlineMs; +public final TimeoutOperation operation; + +ScheduledTimeout( +String key, +long deadlineMs, +TimeoutOperation operation +) { +this.key = key; +this.deadlineMs = deadlineMs; +this.operation = operation; +} +} + +public static class ExpiredTimeout { +public final String key; +public final List records; + +ExpiredTimeout( +String key, +List records +) { +this.key = key; +this.records = records; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +ExpiredTimeout that = (ExpiredTimeout) o; + +if (!Objects.equals(key, that.key)) return false; +return Objects.equals(records, that.records); +} + +@Override +public int hashCode() { +int result = key != null ? key.hashCode() : 0; +result = 31 * result + (records != null ? records.hashCode() : 0); +return result; +} +} + +private final Time time; + +private final Map> timeoutMap = new HashMap<>(); +private final PriorityQueue> timeoutQueue = new PriorityQueue<>( +Comparator.comparingLong(entry -> entry.deadlineMs) +); + +public MockCoordinatorTimer(Time time) { +this.time = time; +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +boolean retry, +TimeoutOperation operation +) { +cancel(key); + +long deadlineMs = time.milliseconds() + unit.toMillis(delay); +ScheduledTimeout timeout = new ScheduledTimeout<>(key, deadlineMs, operation); +timeoutQueue.add(timeout); +timeoutMap.put(key, timeout); +} + +@Override +public void cancel(String key) { +ScheduledTimeout timeout = timeoutMap.remove(key); +if (timeout != null) { +timeoutQueue.remove(timeout); +} +} + +public boolean contains(String key) { +return timeoutMap.containsKey(key); +} + +public ScheduledTimeout timeout(String key) { +return timeoutMap.get(key); +} + +public int size() { +return timeoutMap.size(); +} + +public List> poll() { Review Comment: So we don't expire timeouts until we call this poll? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258688221 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING && +member.memberEpoch() != expectedMemberEpoch) { Review Comment: I just extended `testRevocationTimeoutLifecycle` to execute a stale timer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258683353 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING && +member.memberEpoch() != expectedMemberEpoch) { Review Comment: Do we have a test for this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258682428 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { Review Comment: That's correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258681834 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { Review Comment: Oh I see. We only catch the exceptions in the other PR if they are thrown (not caught) here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15172) Allow exact mirroring of ACLs between clusters
Mickael Maison created KAFKA-15172: -- Summary: Allow exact mirroring of ACLs between clusters Key: KAFKA-15172 URL: https://issues.apache.org/jira/browse/KAFKA-15172 Project: Kafka Issue Type: Task Components: mirrormaker Reporter: Mickael Maison When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ. The rationale to is prevent other clients to produce to remote topics. However in disaster recovery scenarios, where the target cluster is not used and just a "hot standby", it would be preferable to have exactly the same ACLs on both clusters to speed up failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258669748 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING && +member.memberEpoch() != expectedMemberEpoch) { Review Comment: good catch. i should have used an OR in the condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] eziosudo commented on pull request #13982: KAFKA-15159: upgrade minor dependencies
eziosudo commented on PR #13982: URL: https://github.com/apache/kafka/pull/13982#issuecomment-1629456542 > Thank you for the changes. We need to update the corresponding entries in LICENSE file too: > > https://github.com/apache/kafka/blob/d481163d55a1cea3206fa6e386b24ac84e17c3d7/LICENSE-binary#L244 Thanks for the review. Already update LICENSE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258664011 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { Review Comment: It is actually the other way around. We catch those exceptions because we don't want to retry them. If the member does not exist or if the group does not exist, it means that the timer is stale. This should normally not happen because the timers should be cancelled when a member is removed and a group cannot be deleted if not empty. The API to delete a group is not implemented yet but will come soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1258640264 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); Review Comment: Should we tell users that this is because stdin will be used and we don't do any offset tracking in that case? ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset != null && !offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} +} + +// Let the task validate the actual value for the offset position on startup Review Comment: Any reason not to do this preemptively? We could at least validate that the value for the "position" key is non-null, is a numeric type, and is non-negative. ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +151,59 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("invalid_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +connector.alterOffsets(sourceProperties, offsets); Review
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258653960 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING && +member.memberEpoch() != expectedMemberEpoch) { Review Comment: We continue to revoke even if the epoch is unexpected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258653960 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING && +member.memberEpoch() != expectedMemberEpoch) { Review Comment: We continue to revoke even if the epoch is unexpected? Also do we want to return early here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258650248 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -700,17 +726,36 @@ private CoordinatorResult consumerGr String groupId, String memberId ) throws ApiException { -List records = new ArrayList<>(); - ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group."); +List records = consumerGroupFenceMember(group, member); +cancelConsumerGroupSessionTimeout(groupId, memberId); Review Comment: would we also want to cancel the revocation timeout? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258644974 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { Review Comment: based on the previous pr, we retry these exceptions. I can imagine that some metadata was slow to update or something and eventually it could succeed. Do we have any path forward though if this request was issued and the group/member is no longer there? I guess this relying on canceling the task when we call group leave for the member? Do we have a method also to remove groups? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1258644974 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +765,116 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { Review Comment: based on the previous pr, we retry these exceptions. I can imagine that some metadata was slow to update or something and eventually it could succeed. Do we have any path forward though if this request was issued and the group/member is no longer there? I guess this relying on canceling the task when we call group leave for the member? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions
[ https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15145: -- Fix Version/s: 3.5.1 > AbstractWorkerSourceTask re-processes records filtered out by SMTs on > retriable exceptions > -- > > Key: KAFKA-15145 > URL: https://issues.apache.org/jira/browse/KAFKA-15145 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2, 3.5.0, 3.4.1 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > Fix For: 3.6.0, 3.5.1 > > > If a RetriableException is thrown from an admin client or producer client > operation in > [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388], > the send operation is retried for the remaining records in the batch. There > is a bug in the logic for computing the remaining records in a batch which > causes records that are filtered out by the task's transformation chain to be > re-processed. This will also result in the SourceTask::commitRecord method > being called twice for the same record, which can cause certain types of > source connectors to fail. This bug seems to exist since when SMTs were first > introduced in 0.10.2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask
C0urante merged PR #13955: URL: https://github.com/apache/kafka/pull/13955 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
C0urante commented on code in PR #13913: URL: https://github.com/apache/kafka/pull/13913#discussion_r1258611895 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map topicConfigs) { })); } -private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +// Visible for testing +int updateTopicAcls(List bindings) { +Set addBindings = new HashSet<>(bindings); +addBindings.removeAll(knownTopicAclBindings); +int newBindCount = addBindings.size(); +if (!addBindings.isEmpty()) { +log.info("Syncing new found {} topic ACL bindings.", newBindCount); +targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); +} +})); +knownTopicAclBindings = new HashSet<>(bindings); Review Comment: Hmmm... won't this cause issues if the call above to `targetAdminClient::createAcls` fails? ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -683,4 +686,35 @@ private Optional validateProperty(String name, Map assertNotNull(result, "Connector should not have record null config value for '" + name + "' property"); return Optional.of(result); } + +@Test +public void testUpdateIncrementTopicAcls() { +Admin sourceAdmin = mock(Admin.class); +Admin targetAdmin = mock(Admin.class); +MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin); + +List filteredBindings = new ArrayList<>(); +AclBinding binding1 = mock(AclBinding.class); +AclBinding binding2 = mock(AclBinding.class); +filteredBindings.add(binding1); +filteredBindings.add(binding2); + doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet()); + +// First topic acl info update when starting `syncTopicAcls` thread +int newAddCount = connector.updateTopicAcls(filteredBindings); +assertEquals(connector.knownTopicAclBindings(), new HashSet<>(filteredBindings)); +assertTrue(newAddCount == filteredBindings.size()); + +List newAddBindings = new ArrayList<>(); +AclBinding binding3 = mock(AclBinding.class); +AclBinding binding4 = mock(AclBinding.class); +newAddBindings.add(binding3); +newAddBindings.add(binding4); +filteredBindings.addAll(newAddBindings); + +// The next increment topic acl info update +newAddCount = connector.updateTopicAcls(filteredBindings); +assertEquals(connector.knownTopicAclBindings(), new HashSet<>(filteredBindings)); +assertTrue(newAddCount == newAddBindings.size()); Review Comment: ```suggestion assertEquals(newAddBindings.size(), newAddCount); ``` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ## @@ -683,4 +686,35 @@ private Optional validateProperty(String name, Map assertNotNull(result, "Connector should not have record null config value for '" + name + "' property"); return Optional.of(result); } + +@Test +public void testUpdateIncrementTopicAcls() { +Admin sourceAdmin = mock(Admin.class); +Admin targetAdmin = mock(Admin.class); +MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin); + +List filteredBindings = new ArrayList<>(); +AclBinding binding1 = mock(AclBinding.class); +AclBinding binding2 = mock(AclBinding.class); +filteredBindings.add(binding1); +filteredBindings.add(binding2); + doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet()); + +// First topic acl info update when starting `syncTopicAcls` thread +int newAddCount = connector.updateTopicAcls(filteredBindings); +assertEquals(connector.knownTopicAclBindings(), new HashSet<>(filteredBindings)); +assertTrue(newAddCount == filteredBindings.size()); Review Comment: ```suggestion assertEquals(filteredBindings.size(), newAddCount); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this
[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1629387962 > Thanks @vamossagar12, this is looking much better now! Thanks Yash.. I addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1258617166 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when Review Comment: Well the rationale it the same that EOS waits indefinitely for the commits to happen. In this case, the tombstone containing offset writes happen to fall in the critical path and hence the commit transaction needs to wait for this as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1258614182 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -233,4 +496,10 @@ private void expectStore(Map key, byte[] keySerialized, }); } +private void extractKeyValue(Map key, byte[] keySerialized, Map value, byte[] valueSerialized) { Review Comment: hmm I updated it. As such the effect on the overall code-flow of the method was extracting keys and values and hence the name. Anyways, made the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1258613193 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when +// tombstone records fail to be written to the secondary store. Note that while commitTransaction +// already waits for all records to be sent and ack'ed, in this case we do need to add an explicit +// blocking call. In case EOS is disabled, we wait for the same duration as `offset.commit.timeout.ms` +// and throw that exception which would allow the offset commit to fail. +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (InterruptedException e) { +log.warn("{} Flush of tombstone(s)-containing offsets to secondary store interrupted, cancelling", this); Review Comment: I changed the only occurrences of `offsets with tombstones` to `tombstone(s)-containing offsets` and `secondary storage` to `secondary store`. Regarding `primary` and `secondary` store writes, that nomenclature was already being used [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L285-L301) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1258609819 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); Review Comment: It doesn't need to be, but it's cleaner that way. Otherwise we will need to make explicit null checks. This pattern is used at other places as well like [here](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484). I have retained the `AtomicReference` usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1258609819 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); Review Comment: It doesn't need to be, but it's cleaner that way. Otherwise we will need to make explicit null checks. This pattern is used at other places as well like [here](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1258608091 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask
yashmayya commented on PR #13955: URL: https://github.com/apache/kafka/pull/13955#issuecomment-1629367616 Thanks Chris, I agree with the point you brought up regarding potential poison pill messages. This PR can be merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13946: KAFKA-15139:Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`
C0urante commented on PR #13946: URL: https://github.com/apache/kafka/pull/13946#issuecomment-1629366713 @hudeqi I think @gharris1727 can merge this now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask
yashmayya commented on code in PR #13955: URL: https://github.com/apache/kafka/pull/13955#discussion_r1258603343 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -651,6 +652,40 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { verifyTopicCreation(); } +@Test +public void testSendRecordsRetriableException() { +createWorkerTask(); + +SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + +expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); +expectTaskGetTopic(); + +when(transformationChain.apply(eq(record1))).thenReturn(null); +when(transformationChain.apply(eq(record2))).thenReturn(null); +when(transformationChain.apply(eq(record3))).thenReturn(record3); + +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); +TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + +when(producer.send(any(), any())).thenThrow(new RetriableException("Retriable exception")).thenReturn(null); + +workerTask.toSend = Arrays.asList(record1, record2, record3); + +// The producer throws a RetriableException the first time we try to send the third record +assertFalse(workerTask.sendRecords()); + +// The next attempt to send the third record should succeed +assertTrue(workerTask.sendRecords()); + +// Ensure that the first two records that were filtered out by the transformation chain +// aren't re-processed when we retry the call to sendRecords() +verify(transformationChain, times(4)).apply(any(SourceRecord.class)); Review Comment: Ah, that's a good point, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14059) Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14059: -- Fix Version/s: 3.6.0 > Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest > --- > > Key: KAFKA-14059 > URL: https://issues.apache.org/jira/browse/KAFKA-14059 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14059) Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14059. --- Resolution: Done > Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest > --- > > Key: KAFKA-14059 > URL: https://issues.apache.org/jira/browse/KAFKA-14059 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante merged PR #13383: URL: https://github.com/apache/kafka/pull/13383 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
C0urante merged PR #13284: URL: https://github.com/apache/kafka/pull/13284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13661: Doc fixes: Fix format and other small errors in config documentation
bbejeck commented on PR #13661: URL: https://github.com/apache/kafka/pull/13661#issuecomment-1629346978 Merged #13661 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #13661: Doc fixes: Fix format and other small errors in config documentation
bbejeck merged PR #13661: URL: https://github.com/apache/kafka/pull/13661 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13941: KAFKA-15123: Add tests for ChunkedBytesStream
divijvaidya commented on code in PR #13941: URL: https://github.com/apache/kafka/pull/13941#discussion_r1258587583 ## clients/src/test/java/org/apache/kafka/common/utils/ChunkedBytesStreamTest.java: ## @@ -149,6 +160,38 @@ public void testCorrectnessForMethodSkip(int bytesToPreRead, ByteBuffer inputBuf } } +@Test +public void testEdgeCaseInputForMethodSkip() throws IOException { +int bufferLength = 16; +ByteBuffer inputBuf = ByteBuffer.allocate(bufferLength); +RANDOM.nextBytes(inputBuf.array()); +inputBuf.position(inputBuf.capacity()); +inputBuf.flip(); + +try (InputStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuf.duplicate()), supplier, 10, true)) { Review Comment: you can use `@ParameterizedTest` for true/false here instead of duplicating code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13661: Doc fixes: Fix format and other small errors in config documentation
bbejeck commented on PR #13661: URL: https://github.com/apache/kafka/pull/13661#issuecomment-1629339267 Failures unrelated ``` JDK 8 and Scala 2.12 / testUnregisterBroker() – org.apache.kafka.controller.QuorumControllerTest JDK 8 and Scala 2.12 / testConfigurationOperations() – org.apache.kafka.controller.QuorumControllerTest JDK 17 and Scala 2.13 / testMultiNodeCluster() – org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest JDK 17 and Scala 2.13 / testConnectorBoundary – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest JDK 11 and Scala 2.13 / testOutdatedCoordinatorAssignment() – org.apache.kafka.clients.consumer.internals.CooperativeConsumerCoordinatorTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org