[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259142145


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +764,121 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+// Cancel all the timers of the member.
+cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
member.memberId());
+
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
+member.memberEpoch() != expectedMemberEpoch) {
+log.debug("[GroupId " + groupId + "] Ignoring revocation 
timeout for " + memberId + " because the member " +
+"state does not match the expected state.");
+return Collections.emptyList();
+}
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"it failed to revoke partitions within {}ms.", 
revocationTimeoutMs);
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-10 Thread via GitHub


rreddy-22 commented on code in PR #13920:
URL: https://github.com/apache/kafka/pull/13920#discussion_r1259195095


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -194,6 +193,7 @@ private boolean allSubscriptionsEqual(Set allTopics,
 otherConsumer, otherMemberGeneration,
 tp,
 otherMemberGeneration);
+allPreviousPartitionsToOwner.put(tp, 
otherConsumer);

Review Comment:
   So in line 161 we directly added the current consumer to the map 
allPreviousPartitionsToOwner. And if we choose to keep that line we should 
remove the put in the first two conditions in lines 175 and 180 and we can keep 
the change you put. 
   However, I feel like this would be redundant since we're replacing the 
otherMember and adding it back if the conditions aren't favorable. My 
suggestion would be to replace line 161 with get instead of put and then keep 
the rest of the code as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259168414


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   > Well the rationale it the same that EOS waits indefinitely for the commits 
to happen
   
   Prior to this change, we just waited for the producer transaction commit to 
complete since we didn't care about the writes to the secondary store (and the 
transaction would contain all the data records as well as offset records 
written to the connector's primary store). However, that would be bounded by 
the producer's `transaction.timeout.ms` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] eziosudo commented on pull request #13982: KAFKA-15159: upgrade minor dependencies

2023-07-10 Thread via GitHub


eziosudo commented on PR #13982:
URL: https://github.com/apache/kafka/pull/13982#issuecomment-1630095390

   > scalaCollectionCompat has a new version `2.11`, did we consider that?
   
   Yes, just found the release note here, seems harmless. 
   https://github.com/scala/scala-collection-compat/releases/tag/v2.11.0
   
   By the way, scalaLogging also has a new version '3.9.5', but the release 
notes didn't update, Do you think we can upgrade it as well ? 
   https://github.com/lightbend-labs/scala-logging/releases
   https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging
   https://github.com/apache/kafka/assets/54128896/c6623dd2-ac33-477c-b203-522295f05213;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259161121


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing offsets to 
secondary store interrupted, cancelling", this);

Review Comment:
   Thanks, that makes sense  



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing offsets to 
secondary store interrupted, cancelling", this);

Review Comment:
   Thanks, that makes sense  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259160336


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   Aren't we still making explicit null checks here - 
https://github.com/apache/kafka/pull/13801/files#diff-0014cfa4c32bfde215cb10bee987e997d5182815fcf1a1245539375519f83f3dR325-R326?
 樂 
   
   I'm fine with retaining it though, this isn't a blocking comment.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   Aren't we still making explicit null checks here - 
https://github.com/apache/kafka/pull/13801/files#diff-0014cfa4c32bfde215cb10bee987e997d5182815fcf1a1245539375519f83f3dR325-R326?
 樂 
   
   I'm fine with retaining it though, this isn't a blocking comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15139) Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`

2023-07-10 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi resolved KAFKA-15139.

Resolution: Fixed

> Optimize the performance of `Set.removeAll(List)` in 
> `MirrorCheckpointConnector`
> 
>
> Key: KAFKA-15139
> URL: https://issues.apache.org/jira/browse/KAFKA-15139
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> This is the hint of `removeAll` method in `Set`:
> _This implementation determines which is the smaller of this set and the 
> specified collection, by invoking the size method on each. If this set has 
> fewer elements, then the implementation iterates over this set, checking each 
> element returned by the iterator in turn to see if it is contained in the 
> specified collection. If it is so contained, it is removed from this set with 
> the iterator's remove method. If the specified collection has fewer elements, 
> then the implementation iterates over the specified collection, removing from 
> this set each element returned by the iterator, using this set's remove 
> method._
> That's said, assume that _M_ is the number of elements in the set and _N_ is 
> the number of elements in the List, if the type of the specified collection 
> is `List`, and {_}M<=N{_}, then the time complexity of `removeAll` is _O(MN)_ 
> (because the time complexity of searching in List is {_}O(N){_}), on the 
> contrary, if {_}N {_}O(N){_}.
> In `MirrorCheckpointConnector`, `refreshConsumerGroups` method is repeatedly 
> called in a daemon thread. There are two `removeAll` in this method. From a 
> logical point of view, when this method is called in one round, when the 
> number of groups in the source cluster simply increases or decreases, the two 
> `removeAll` execution strategies will always hit the _O(MN)_ situation 
> mentioned above. Therefore, it is better to change all the variables here to 
> Set type to avoid this "low performance".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-10 Thread via GitHub


hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1630082775

   I have added the unit test for the related ”createAcl failure“ case, thanks 
for the review! @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


CalvinConfluent commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259133929


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1259130578


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+public static final String DIR_NAME = "remote-log-index-cache";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps 

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1259130451


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+public static final String DIR_NAME = "remote-log-index-cache";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps 

[GitHub] [kafka] satishd commented on pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


satishd commented on PR #13275:
URL: https://github.com/apache/kafka/pull/13275#issuecomment-1630042350

   > Just a few comments related to TxnIndex potentially being optional.
   
   @jeqo  There is already 
[KAFKA-14993](https://issues.apache.org/jira/browse/KAFKA-14993) to address 
that. @kamalcph was working on that. I did not want to add those changes to 
this PR as it is tracked separately. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1259126161


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+public static final String DIR_NAME = "remote-log-index-cache";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps 

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1259125780


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+public static final String DIR_NAME = "remote-log-index-cache";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps 

[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1259125780


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+public static final String DIR_NAME = "remote-log-index-cache";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259115771


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -367,6 +446,155 @@ public 
CoordinatorResult consumerGro
 return result;
 }
 
+public CompletableFuture sendGenericGroupJoin(
+JoinGroupRequestData request
+) {
+return sendGenericGroupJoin(request, false);
+}
+
+public CompletableFuture sendGenericGroupJoin(
+JoinGroupRequestData request,
+boolean requireKnownMemberId
+) {
+return sendGenericGroupJoin(request, requireKnownMemberId, false, 
null);
+}
+
+public CompletableFuture sendGenericGroupJoin(
+JoinGroupRequestData request,
+boolean requireKnownMemberId,
+boolean supportSkippingAssignment,
+ExpectedGenericGroupResult expectedResult
+) {
+// requireKnownMemberId is true: version >= 4
+// supportSkippingAssignment is true: version >= 9
+short joinGroupVersion = 3;
+
+if (requireKnownMemberId) {
+joinGroupVersion = 4;
+if (supportSkippingAssignment) {
+joinGroupVersion = ApiKeys.JOIN_GROUP.latestVersion();
+}
+}
+
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+RequestContext context = new RequestContext(
+new RequestHeader(
+ApiKeys.JOIN_GROUP,
+joinGroupVersion,
+"client",
+0
+),
+"1",
+InetAddress.getLoopbackAddress(),
+KafkaPrincipal.ANONYMOUS,
+ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+SecurityProtocol.PLAINTEXT,
+ClientInformation.EMPTY,
+false
+);
+
+CoordinatorResult result = 
groupMetadataManager.genericGroupJoin(
+context,
+request,
+responseFuture
+);
+
+if (expectedResult != null) {
+GenericGroup group = 
groupMetadataManager.getOrMaybeCreateGenericGroup(
+request.groupId(),
+false
+);
+
+Record groupMetadataRecord;
+if (expectedResult.isNewGroup) {
+groupMetadataRecord = newEmptyGroupMetadataRecord(group, 
MetadataVersion.latest());
+} else {
+groupMetadataRecord = 
RecordHelpers.newGroupMetadataRecord(group, MetadataVersion.latest());
+}
+
+expectedResult.records = 
Collections.singletonList(groupMetadataRecord);

Review Comment:
   i'll think a bit more on this as it will require a large change in this 
class.
   
   one of the reasons i had it like this is that we mostly care about the 
responseFuture in the tests and wanted to hide the record/append future 
validations. The timer could also produce records which require setting things 
in advance.
   
   i agree it is unclean, i'll address this in the next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer

2023-07-10 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741819#comment-17741819
 ] 

Ismael Juma commented on KAFKA-15175:
-

nio2 is not necessarily faster than nio - it's a different model.

> Assess the use of nio2 asynchronous channel for KafkaConsumer
> -
>
> Key: KAFKA-15175
> URL: https://issues.apache.org/jira/browse/KAFKA-15175
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Philip Nee
>Priority: Major
>
> We should assess if NIO2 is appropriate to replace the current nio library 
> with more performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15152) Fix incorrect format specifiers when formatting string

2023-07-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741814#comment-17741814
 ] 

Phuc Hong Tran edited comment on KAFKA-15152 at 7/11/23 2:30 AM:
-

Hi [~divijvaidya], can I have this one assign to me? Thanks

P.S: I just assigned it to myself


was (Author: JIRAUSER301295):
Hi [~divijvaidya], can I have this one assign to me? Thanks

> Fix incorrect format specifiers when formatting string
> --
>
> Key: KAFKA-15152
> URL: https://issues.apache.org/jira/browse/KAFKA-15152
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: newbiw
>
> *This is a good Jira to be picked up by first time contributors to Kafka code 
> base.*
> The objective of this Jira is to fix incorrect formatting of string at 
> multiple places in the code which can cause incorrect print when used in a 
> different locale.
> *1. FollowerState.java Line: 121*
> This code uses '%s' to format long: updatedHighWatermark (declared at line 
> 117). This is a potential locale-sensitive handling issue. It might cause 
> errors in the handling and processing of the statement at line: 121. Consider 
> formatting this data with '%d' instead.
> *2. MemoryRecordsBuilder.java Line: 441*
> This code uses '%s' to format long: offset (declared at line 434), 
> lastOffset. This is a potential locale-sensitive handling issue. It might 
> cause errors in the handling and processing of the statement at line: 441. 
> Consider formatting this data with '%d' instead.
> *3. HttpAccessTokenRetriever.java Line: 340*
> This code uses '%s' to format int: MAX_RESPONSE_BODY_LENGTH, actualLength 
> (declared at line 338). This is a potential locale-sensitive handling issue. 
> It might cause errors in the handling and processing of the statement at 
> line: 340. Consider formatting this data with '%d' instead.
> 4. *KafkaAdminClient.java Line: 1256*
> This code uses '%s' to format int: correlationId (declared at line 1226). 
> This is a potential locale-sensitive handling issue. It might cause errors in 
> the handling and processing of the statement at line: 1256. Consider 
> formatting this data with '%d' instead.
> *5 . Batch.java Line: 170*
> This code uses '%s' to format int: epoch (declared at line 163). This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 170. Consider formatting 
> this data with '%d' instead.
> *6. Batch.java Line: 207*
> This code uses '%s' to format int: epoch (declared at line 200). This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 207. Consider formatting 
> this data with '%d' instead.
> *7. BatchBuilder.java Line: 330*
> This code uses '%s' to format int: 'size' expression. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 330. Consider formatting this data with 
> '%d' instead.
> *8. CopartitionedTopicsEnforcer.java Line: 104*
> This code uses '%s' to format int: numberOfPartitionsOfInternalTopic 
> (declared at line 99), numPartitionsToUseForRepartitionTopics (defined at 
> line 81). This is a potential locale-sensitive handling issue. It might cause 
> errors in the handling and processing of the statement at line: 104. Consider 
> formatting this data with '%d' instead.
>  
> *9. RefreshingHttpsJwks.java Line: 337*
> This code uses '%s' to format int: MISSING_KEY_ID_MAX_KEY_LENGTH, 
> actualLength (declared at line 335). This is a potential locale-sensitive 
> handling issue. It might cause errors in the handling and processing of the 
> statement at line: 337. Consider formatting this data with '%d' instead.
> *10. SerializedJwt.java Line: 47*
> This code uses '%s' to format int: splits.length. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 47. Consider formatting this data with 
> '%d' instead.
> *11. ControllerResultAndOffset.java Line: 57*
> This code uses '%s' to format long: offset. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 57. Consider formatting this data with 
> '%d' instead.
> *12. ReplicatedLog.java Line: 101*
> This code uses '%s' to format long: 'startOffset' expression. This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 101. Consider formatting 
> this data with '%d' instead.
> *13. HttpAccessTokenRetriever.java Line: 275*
> This code uses '%s' to 

[jira] [Assigned] (KAFKA-15152) Fix incorrect format specifiers when formatting string

2023-07-10 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-15152:
--

Assignee: Phuc Hong Tran

> Fix incorrect format specifiers when formatting string
> --
>
> Key: KAFKA-15152
> URL: https://issues.apache.org/jira/browse/KAFKA-15152
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: newbiw
>
> *This is a good Jira to be picked up by first time contributors to Kafka code 
> base.*
> The objective of this Jira is to fix incorrect formatting of string at 
> multiple places in the code which can cause incorrect print when used in a 
> different locale.
> *1. FollowerState.java Line: 121*
> This code uses '%s' to format long: updatedHighWatermark (declared at line 
> 117). This is a potential locale-sensitive handling issue. It might cause 
> errors in the handling and processing of the statement at line: 121. Consider 
> formatting this data with '%d' instead.
> *2. MemoryRecordsBuilder.java Line: 441*
> This code uses '%s' to format long: offset (declared at line 434), 
> lastOffset. This is a potential locale-sensitive handling issue. It might 
> cause errors in the handling and processing of the statement at line: 441. 
> Consider formatting this data with '%d' instead.
> *3. HttpAccessTokenRetriever.java Line: 340*
> This code uses '%s' to format int: MAX_RESPONSE_BODY_LENGTH, actualLength 
> (declared at line 338). This is a potential locale-sensitive handling issue. 
> It might cause errors in the handling and processing of the statement at 
> line: 340. Consider formatting this data with '%d' instead.
> 4. *KafkaAdminClient.java Line: 1256*
> This code uses '%s' to format int: correlationId (declared at line 1226). 
> This is a potential locale-sensitive handling issue. It might cause errors in 
> the handling and processing of the statement at line: 1256. Consider 
> formatting this data with '%d' instead.
> *5 . Batch.java Line: 170*
> This code uses '%s' to format int: epoch (declared at line 163). This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 170. Consider formatting 
> this data with '%d' instead.
> *6. Batch.java Line: 207*
> This code uses '%s' to format int: epoch (declared at line 200). This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 207. Consider formatting 
> this data with '%d' instead.
> *7. BatchBuilder.java Line: 330*
> This code uses '%s' to format int: 'size' expression. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 330. Consider formatting this data with 
> '%d' instead.
> *8. CopartitionedTopicsEnforcer.java Line: 104*
> This code uses '%s' to format int: numberOfPartitionsOfInternalTopic 
> (declared at line 99), numPartitionsToUseForRepartitionTopics (defined at 
> line 81). This is a potential locale-sensitive handling issue. It might cause 
> errors in the handling and processing of the statement at line: 104. Consider 
> formatting this data with '%d' instead.
>  
> *9. RefreshingHttpsJwks.java Line: 337*
> This code uses '%s' to format int: MISSING_KEY_ID_MAX_KEY_LENGTH, 
> actualLength (declared at line 335). This is a potential locale-sensitive 
> handling issue. It might cause errors in the handling and processing of the 
> statement at line: 337. Consider formatting this data with '%d' instead.
> *10. SerializedJwt.java Line: 47*
> This code uses '%s' to format int: splits.length. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 47. Consider formatting this data with 
> '%d' instead.
> *11. ControllerResultAndOffset.java Line: 57*
> This code uses '%s' to format long: offset. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 57. Consider formatting this data with 
> '%d' instead.
> *12. ReplicatedLog.java Line: 101*
> This code uses '%s' to format long: 'startOffset' expression. This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 101. Consider formatting 
> this data with '%d' instead.
> *13. HttpAccessTokenRetriever.java Line: 275*
> This code uses '%s' to format int: responseCode (declared at line 240). This 
> is a potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 275. Consider formatting 
> this data with '%d' 

[jira] [Commented] (KAFKA-15152) Fix incorrect format specifiers when formatting string

2023-07-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741814#comment-17741814
 ] 

Phuc Hong Tran commented on KAFKA-15152:


Hi [~divijvaidya], can I have this one assign to me? Thanks

> Fix incorrect format specifiers when formatting string
> --
>
> Key: KAFKA-15152
> URL: https://issues.apache.org/jira/browse/KAFKA-15152
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbiw
>
> *This is a good Jira to be picked up by first time contributors to Kafka code 
> base.*
> The objective of this Jira is to fix incorrect formatting of string at 
> multiple places in the code which can cause incorrect print when used in a 
> different locale.
> *1. FollowerState.java Line: 121*
> This code uses '%s' to format long: updatedHighWatermark (declared at line 
> 117). This is a potential locale-sensitive handling issue. It might cause 
> errors in the handling and processing of the statement at line: 121. Consider 
> formatting this data with '%d' instead.
> *2. MemoryRecordsBuilder.java Line: 441*
> This code uses '%s' to format long: offset (declared at line 434), 
> lastOffset. This is a potential locale-sensitive handling issue. It might 
> cause errors in the handling and processing of the statement at line: 441. 
> Consider formatting this data with '%d' instead.
> *3. HttpAccessTokenRetriever.java Line: 340*
> This code uses '%s' to format int: MAX_RESPONSE_BODY_LENGTH, actualLength 
> (declared at line 338). This is a potential locale-sensitive handling issue. 
> It might cause errors in the handling and processing of the statement at 
> line: 340. Consider formatting this data with '%d' instead.
> 4. *KafkaAdminClient.java Line: 1256*
> This code uses '%s' to format int: correlationId (declared at line 1226). 
> This is a potential locale-sensitive handling issue. It might cause errors in 
> the handling and processing of the statement at line: 1256. Consider 
> formatting this data with '%d' instead.
> *5 . Batch.java Line: 170*
> This code uses '%s' to format int: epoch (declared at line 163). This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 170. Consider formatting 
> this data with '%d' instead.
> *6. Batch.java Line: 207*
> This code uses '%s' to format int: epoch (declared at line 200). This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 207. Consider formatting 
> this data with '%d' instead.
> *7. BatchBuilder.java Line: 330*
> This code uses '%s' to format int: 'size' expression. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 330. Consider formatting this data with 
> '%d' instead.
> *8. CopartitionedTopicsEnforcer.java Line: 104*
> This code uses '%s' to format int: numberOfPartitionsOfInternalTopic 
> (declared at line 99), numPartitionsToUseForRepartitionTopics (defined at 
> line 81). This is a potential locale-sensitive handling issue. It might cause 
> errors in the handling and processing of the statement at line: 104. Consider 
> formatting this data with '%d' instead.
>  
> *9. RefreshingHttpsJwks.java Line: 337*
> This code uses '%s' to format int: MISSING_KEY_ID_MAX_KEY_LENGTH, 
> actualLength (declared at line 335). This is a potential locale-sensitive 
> handling issue. It might cause errors in the handling and processing of the 
> statement at line: 337. Consider formatting this data with '%d' instead.
> *10. SerializedJwt.java Line: 47*
> This code uses '%s' to format int: splits.length. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 47. Consider formatting this data with 
> '%d' instead.
> *11. ControllerResultAndOffset.java Line: 57*
> This code uses '%s' to format long: offset. This is a potential 
> locale-sensitive handling issue. It might cause errors in the handling and 
> processing of the statement at line: 57. Consider formatting this data with 
> '%d' instead.
> *12. ReplicatedLog.java Line: 101*
> This code uses '%s' to format long: 'startOffset' expression. This is a 
> potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 101. Consider formatting 
> this data with '%d' instead.
> *13. HttpAccessTokenRetriever.java Line: 275*
> This code uses '%s' to format int: responseCode (declared at line 240). This 
> is a potential locale-sensitive handling issue. It might cause errors in the 
> handling and processing of the statement at line: 275. Consider formatting 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259098972


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259098122


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259097546


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259096806


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259095013


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-10 Thread via GitHub


hudeqi commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1259093047


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }));
 }
 
-private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+// Visible for testing
+int updateTopicAcls(List bindings) {
+Set addBindings = new HashSet<>(bindings);
+addBindings.removeAll(knownTopicAclBindings);
+int newBindCount = addBindings.size();
+if (!addBindings.isEmpty()) {
+log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+targetAdminClient.createAcls(addBindings).values().forEach((k, v) 
-> v.whenComplete((x, e) -> {
+if (e != null) {
+log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
+}
+}));
+knownTopicAclBindings = new HashSet<>(bindings);

Review Comment:
   nice catch !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-10 Thread via GitHub


hudeqi commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1259093047


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }));
 }
 
-private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+// Visible for testing
+int updateTopicAcls(List bindings) {
+Set addBindings = new HashSet<>(bindings);
+addBindings.removeAll(knownTopicAclBindings);
+int newBindCount = addBindings.size();
+if (!addBindings.isEmpty()) {
+log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+targetAdminClient.createAcls(addBindings).values().forEach((k, v) 
-> v.whenComplete((x, e) -> {
+if (e != null) {
+log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
+}
+}));
+knownTopicAclBindings = new HashSet<>(bindings);

Review Comment:
   nice cacth !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259062674


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259059291


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259054171


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259054171


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259049772


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259047855


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
 // Verify the groups.
 Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 Arrays.asList("group5").forEach(groupId -> {
-ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ConsumerGroup group = context.groupMetadataManager
+.getOrMaybeCreateConsumerGroup(groupId, false);
 assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
 });
 
 // Verify image.
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259040832


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -228,14 +261,21 @@ public List build(TopicsImage topicsImage) {
 
 static class GroupMetadataManagerTestContext {
 static class Builder {
-final private Time time = new MockTime();
 final private LogContext logContext = new LogContext();
 final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
 private MetadataImage metadataImage;
-private List assignors;

Review Comment:
   we get illegal state exception if it's not initialized and since it doesn't 
affect the old protocol i thought it best to keep it clean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259030830


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -271,4 +289,110 @@ public void testOnResignation() {
 10
 );
 }
+
+@Test
+public void testJoinGroup() {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+
+JoinGroupRequestData request = new JoinGroupRequestData()
+.setGroupId("foo");
+
+service.startup(() -> 1);
+
+when(runtime.scheduleWriteOperation(
+ArgumentMatchers.eq("generic-group-join"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.any()
+)).thenReturn(CompletableFuture.completedFuture(
+new JoinGroupResponseData()
+));
+
+CompletableFuture responseFuture = 
service.joinGroup(
+requestContext(ApiKeys.JOIN_GROUP),
+request,
+BufferSupplier.NO_CACHING
+);
+
+assertFalse(responseFuture.isDone());

Review Comment:
   A successful join group request will store the response future into the 
member's `awaitingJoinFuture` (could also complete if the join phase completes)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-10 Thread via GitHub


kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code 
to reduce boilerplate
URL: https://github.com/apache/kafka/pull/13989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259029305


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1043,4 +1221,1331 @@ public void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {
 }
 });
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+groups.remove(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+CoordinatorResult newGroupResult = EMPTY_RESULT;
+if (isNewGroup) {
+// If a group was newly created, we need to append records to 

[GitHub] [kafka] kirktrue opened a new pull request, #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-10 Thread via GitHub


kirktrue opened a new pull request, #13990:
URL: https://github.com/apache/kafka/pull/13990

   There are a number of places in the client code where the same basic calls 
are made by more than one client implementation. Minor refactoring will reduce 
the amount of boilerplate code necessary for the client to construct its 
internal state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-10 Thread via GitHub


kirktrue closed pull request #13989: KAFKA-14937: Refactoring for client code 
to reduce boilerplate
URL: https://github.com/apache/kafka/pull/13989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #13989: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-10 Thread via GitHub


kirktrue opened a new pull request, #13989:
URL: https://github.com/apache/kafka/pull/13989

   There are a number of places in the client code where the same basic calls 
are made by more than one client implementation. Minor refactoring will reduce 
the amount of boilerplate code necessary for the client to construct its 
internal state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1259002211


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1043,4 +1221,1331 @@ public void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {
 }
 });
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+groups.remove(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+CoordinatorResult newGroupResult = EMPTY_RESULT;
+if (isNewGroup) {
+// If a group was newly created, we need to append records to 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1258999406


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1043,4 +1221,1331 @@ public void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {
 }
 });
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+groups.remove(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",

Review Comment:
   is your suggestion to iterate through all groups & members and log each 
member after loading a partition is complete?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] eziosudo commented on a diff in pull request #13982: KAFKA-15159: upgrade minor dependencies

2023-07-10 Thread via GitHub


eziosudo commented on code in PR #13982:
URL: https://github.com/apache/kafka/pull/13982#discussion_r1258959682


##
gradle/dependencies.gradle:
##
@@ -142,8 +142,8 @@ versions += [
   slf4j: "1.7.36",
   snappy: "1.1.10.1",
   spotbugs: "4.7.3",
-  swaggerAnnotations: "2.2.8",
-  swaggerJaxrs2: "2.2.8",
+  swaggerAnnotations: "2.2.14",

Review Comment:
   Thanks for review! I should have took it more carefully!
   Will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #13946: KAFKA-15139:Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`

2023-07-10 Thread via GitHub


gharris1727 merged PR #13946:
URL: https://github.com/apache/kafka/pull/13946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-10 Thread via GitHub


jeqo commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1258934931


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
 BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
 BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
 BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+BrokerTopicStats.RemoteCopyBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
+BrokerTopicStats.RemoteFetchBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
+BrokerTopicStats.RemoteReadRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"),
+BrokerTopicStats.RemoteWriteRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"),
+BrokerTopicStats.FailedRemoteReadRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"),
+BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"),

Review Comment:
   few more naming suggestions:
   
   ```suggestion
   BrokerTopicStats.RemoteFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
   BrokerTopicStats.RemoteCopyRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
   BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
   BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1258937169


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -579,4 +618,32 @@ public void shutdown() {
 Utils.closeQuietly(runtime, "coordinator runtime");
 log.info("Shutdown complete.");
 }
+
+private static boolean isGroupIdNotEmpty(String groupId) {
+return groupId != null && !groupId.isEmpty();
+}
+
+private static Errors toResponseError(Errors appendError) {

Review Comment:
   confirmed that `storeGroup` and `storeOffsets` have different handling.
   
   this will still be shared amongst join/sync/leave group, so i'll rename this 
to `appendGroupMetadataErrorToResponseError`, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13946: KAFKA-15139:Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`

2023-07-10 Thread via GitHub


gharris1727 commented on PR #13946:
URL: https://github.com/apache/kafka/pull/13946#issuecomment-1629762524

   Flaky test failures in the Mirror suite appear unrelated, and the tests pass 
locally. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1258934915


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -579,4 +618,32 @@ public void shutdown() {
 Utils.closeQuietly(runtime, "coordinator runtime");
 log.info("Shutdown complete.");
 }
+
+private static boolean isGroupIdNotEmpty(String groupId) {
+return groupId != null && !groupId.isEmpty();
+}
+
+private static Errors toResponseError(Errors appendError) {
+switch (appendError) {
+case UNKNOWN_TOPIC_OR_PARTITION:
+case NOT_ENOUGH_REPLICAS:
+case NOT_ENOUGH_REPLICAS_AFTER_APPEND:
+return COORDINATOR_NOT_AVAILABLE;
+
+case NOT_LEADER_OR_FOLLOWER:
+case KAFKA_STORAGE_ERROR:
+return NOT_COORDINATOR;
+
+case REQUEST_TIMED_OUT:

Review Comment:
   where should i look to confirm/learn this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-10 Thread via GitHub


jeqo commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1258880361


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.OFFLOAD_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.openFileset;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDirectory.openTopicPartitionDirectory;
+
+/**
+ * An implementation of {@link RemoteStorageManager} which relies on the local 
file system to store
+ * offloaded log segments and associated data.
+ * 
+ * Due to the consistency semantic of POSIX-compliant file systems, this 
remote storage provides strong
+ * read-after-write consistency and a segment's data can be accessed once the 
copy to the storage succeeded.
+ * 
+ * 
+ * In order to guarantee isolation, independence, reproducibility and 
consistency of unit and integration
+ * tests, the scope of a storage implemented by this class, and identified via 
the storage ID 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-10 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1258924230


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +282,32 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isGroupIdNotEmpty(request.groupId())) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleWriteOperation("generic-group-join",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture)
+).exceptionally(exception -> {
+log.error("Request {} hit an unexpected exception: {}",
+request, exception.getMessage());

Review Comment:
   this would log all errors while appending/committing and if 
`generateRecordsAndResponse` throws an unexpected exception. shouldn't we log 
them? 
   
   it doesn't seem like we do for `consumerGroupHeartbeat()` -- maybe just 
filter out the coordinator not available / not coordinator error codes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-07-10 Thread via GitHub


hachikuji commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1258915597


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or 
continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-leaderLogIfLocal match {
+log match {

Review Comment:
   Returning an error seems straightforward. No harm done if the broker comes 
leader just after since the client will retry. Given how hard the scenario was 
to reproduce, I'm not too concerned about impact from retries.



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or 
continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-leaderLogIfLocal match {
+log match {

Review Comment:
   Returning an error seems straightforward. No harm done if the broker becomes 
leader just after since the client will retry. Given how hard the scenario was 
to reproduce, I'm not too concerned about impact from retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer

2023-07-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15175:
--

 Summary: Assess the use of nio2 asynchronous channel for 
KafkaConsumer
 Key: KAFKA-15175
 URL: https://issues.apache.org/jira/browse/KAFKA-15175
 Project: Kafka
  Issue Type: Wish
  Components: consumer
Reporter: Philip Nee


We should assess if NIO2 is appropriate to replace the current nio library with 
more performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ahuang98 commented on pull request #13988: [KAFKA-15137] Log request name only in KRaftControllerChannelManager

2023-07-10 Thread via GitHub


ahuang98 commented on PR #13988:
URL: https://github.com/apache/kafka/pull/13988#issuecomment-1629679221

   There's probably some value in leaving in the `controllerId`, 
`controllerEpoch`, `brokerEpoch` as well. It won't elongate the log by much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 opened a new pull request, #13988: [KAFKA-15137] Log request name only in KRaftControllerChannelManager

2023-07-10 Thread via GitHub


ahuang98 opened a new pull request, #13988:
URL: https://github.com/apache/kafka/pull/13988

   Logging the following
   ```
   [2023-07-10 13:15:22,703] WARN [Channel manager on controller 3000]: Not 
sending request UpdateMetadata to broker 0, since it is offline. 
(kafka.controller.ControllerChannelManager:70)
   [2023-07-10 13:15:22,705] WARN [Channel manager on controller 3000]: Not 
sending request LeaderAndIsr to broker 0, since it is offline. 
(kafka.controller.ControllerChannelManager:70)
   ```
   vs the entire request
   ```
   [2023-07-10 13:09:53,928] WARN [Channel manager on controller 3000]: Not 
sending request (type: UpdateMetadataRequest=, controllerId=3000, 
controllerEpoch=3, brokerEpoch=3, partitionStates=[], 
liveBrokers=UpdateMetadataBroker(id=0, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=64622, host='localhost', 
listener='EXTERNAL', securityProtocol=0), UpdateMetadataEndpoint(port=64621, 
host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null, 
tags=[])) to broker 0, since it is offline. 
(kafka.controller.KRaftControllerChannelManager:70)
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-10 Thread via GitHub


jeqo commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1258867599


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+public static final String DIR_NAME = "remote-log-index-cache";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps 

[GitHub] [kafka] divijvaidya commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-10 Thread via GitHub


divijvaidya commented on PR #13956:
URL: https://github.com/apache/kafka/pull/13956#issuecomment-1629633140

   @mimaison @showuon requesting your review. We have many cases of CI failing 
with hundreds of test failing and this thread leak is one of the reason for it. 
Hence, the urgency.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-07-10 Thread via GitHub


divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1629620630

   Thank you for your patience on this one @C0urante. Appreciate it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13982: KAFKA-15159: upgrade minor dependencies

2023-07-10 Thread via GitHub


divijvaidya commented on code in PR #13982:
URL: https://github.com/apache/kafka/pull/13982#discussion_r1258807856


##
gradle/dependencies.gradle:
##
@@ -142,8 +142,8 @@ versions += [
   slf4j: "1.7.36",
   snappy: "1.1.10.1",
   spotbugs: "4.7.3",
-  swaggerAnnotations: "2.2.8",
-  swaggerJaxrs2: "2.2.8",
+  swaggerAnnotations: "2.2.14",
+  swaggerJaxrs2: "2.2.14",

Review Comment:
   same. required jdk 11+ https://github.com/swagger-api/swagger-core/pull/4377



##
gradle/dependencies.gradle:
##
@@ -142,8 +142,8 @@ versions += [
   slf4j: "1.7.36",
   snappy: "1.1.10.1",
   spotbugs: "4.7.3",
-  swaggerAnnotations: "2.2.8",
-  swaggerJaxrs2: "2.2.8",
+  swaggerAnnotations: "2.2.14",

Review Comment:
   can't update this. Versions after 2.2.8 require minimum JDK 11 - 
https://github.com/swagger-api/swagger-core/pull/4377
   
   Please also add a comment about this here for future folks who may want to 
change this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13986: MINOR: Add upgrade docs for 3.5.1

2023-07-10 Thread via GitHub


divijvaidya commented on code in PR #13986:
URL: https://github.com/apache/kafka/pull/13986#discussion_r1258779004


##
docs/upgrade.html:
##
@@ -19,6 +19,22 @@
 
 

[jira] [Created] (KAFKA-15174) Ensure the correct thread is executing the callbacks

2023-07-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15174:
--

 Summary: Ensure the correct thread is executing the callbacks
 Key: KAFKA-15174
 URL: https://issues.apache.org/jira/browse/KAFKA-15174
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


We need to add assertion tests to ensure the correct thread is executing the 
offset commit callbacks and rebalance callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded

2023-07-10 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15173:
---
Labels: kip-945  (was: )

> ApplicationEventQueue and BackgroundEventQueue should be bounded
> 
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-945
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded

2023-07-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15173:
--

 Summary: ApplicationEventQueue and BackgroundEventQueue should be 
bounded
 Key: KAFKA-15173
 URL: https://issues.apache.org/jira/browse/KAFKA-15173
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
facilitate message passing between the application thread and the background 
thread.  The current implementation is boundless, which can potentially cause 
OOM and other performance-related issues.

I think the queues need a finite bound, and we need to decide how to handle the 
situation when the bound is reached.  In particular, I would like to answer 
these questions:

 
 # What should the upper limit be for both queues: Can this be a configurable, 
memory-based bound? Or just an arbitrary number of events as the bound.
 # What should happen when the application event queue is filled up?  It seems 
like we should introduce a new exception type and notify the user that the 
consumer is full.
 # What should happen when the background event queue is filled up? This seems 
less likely to happen, but I imagine it could happen when the user stops 
polling the consumer, causing the queue to be filled.
 # Is it necessary to introduce a public configuration for the queue? I think 
initially we would select an arbitrary constant number and see the community 
feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Cerchie opened a new pull request, #13987: Kafka-15126: Range queries to accept null lower and upper bounds

2023-07-10 Thread via GitHub


Cerchie opened a new pull request, #13987:
URL: https://github.com/apache/kafka/pull/13987

   Change in response to 
[KIP-941](https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds).
 
   
   Changes line 57 in the RangeQuery class file from:
   
   ```
   public static  RangeQuery withRange(final K lower, final K 
upper) {
   return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
   }
   ```
   to
   ```
   public static  RangeQuery withRange(final K lower, final K 
upper) {
return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper));
}
   ```
   
   Testing strategy: 
   
   Since null values can now be entered in `RangeQuery`s in order to receive 
full scans, I changed the logic defining `query` starting at [line 
1085](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java#L1085)
 in `IQv2StoreIntegrationTest.java` from:
   
   ```
   final RangeQuery query;
   if (lower.isPresent() && upper.isPresent()) {
   query = RangeQuery.withRange(lower.get(), upper.get());
   } else if (lower.isPresent()) {
   query = RangeQuery.withLowerBound(lower.get());
   } else if (upper.isPresent()) {
   query = RangeQuery.withUpperBound(upper.get());
   } else {
   query = RangeQuery.withNoBounds();
   }
   ```
   to
   ```
   query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
   ```
   because different combinations of `isPresent()` in the bounds is no longer 
necessary.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258697658


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+
+public class MockCoordinatorTimer implements CoordinatorTimer {
+public static class ScheduledTimeout {
+public final String key;
+public final long deadlineMs;
+public final TimeoutOperation operation;
+
+ScheduledTimeout(
+String key,
+long deadlineMs,
+TimeoutOperation operation
+) {
+this.key = key;
+this.deadlineMs = deadlineMs;
+this.operation = operation;
+}
+}
+
+public static class ExpiredTimeout {
+public final String key;
+public final List records;
+
+ExpiredTimeout(
+String key,
+List records
+) {
+this.key = key;
+this.records = records;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+ExpiredTimeout that = (ExpiredTimeout) o;
+
+if (!Objects.equals(key, that.key)) return false;
+return Objects.equals(records, that.records);
+}
+
+@Override
+public int hashCode() {
+int result = key != null ? key.hashCode() : 0;
+result = 31 * result + (records != null ? records.hashCode() : 0);
+return result;
+}
+}
+
+private final Time time;
+
+private final Map> timeoutMap = new 
HashMap<>();
+private final PriorityQueue> timeoutQueue = new 
PriorityQueue<>(
+Comparator.comparingLong(entry -> entry.deadlineMs)
+);
+
+public MockCoordinatorTimer(Time time) {
+this.time = time;
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+boolean retry,
+TimeoutOperation operation
+) {
+cancel(key);
+
+long deadlineMs = time.milliseconds() + unit.toMillis(delay);
+ScheduledTimeout timeout = new ScheduledTimeout<>(key, deadlineMs, 
operation);
+timeoutQueue.add(timeout);
+timeoutMap.put(key, timeout);
+}
+
+@Override
+public void cancel(String key) {
+ScheduledTimeout timeout = timeoutMap.remove(key);
+if (timeout != null) {
+timeoutQueue.remove(timeout);
+}
+}
+
+public boolean contains(String key) {
+return timeoutMap.containsKey(key);
+}
+
+public ScheduledTimeout timeout(String key) {
+return timeoutMap.get(key);
+}
+
+public int size() {
+return timeoutMap.size();
+}
+
+public List> poll() {

Review Comment:
   So we don't expire timeouts until we call this poll? Also in general, can we 
add a few more comments about this class and it usage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258697658


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+
+public class MockCoordinatorTimer implements CoordinatorTimer {
+public static class ScheduledTimeout {
+public final String key;
+public final long deadlineMs;
+public final TimeoutOperation operation;
+
+ScheduledTimeout(
+String key,
+long deadlineMs,
+TimeoutOperation operation
+) {
+this.key = key;
+this.deadlineMs = deadlineMs;
+this.operation = operation;
+}
+}
+
+public static class ExpiredTimeout {
+public final String key;
+public final List records;
+
+ExpiredTimeout(
+String key,
+List records
+) {
+this.key = key;
+this.records = records;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+ExpiredTimeout that = (ExpiredTimeout) o;
+
+if (!Objects.equals(key, that.key)) return false;
+return Objects.equals(records, that.records);
+}
+
+@Override
+public int hashCode() {
+int result = key != null ? key.hashCode() : 0;
+result = 31 * result + (records != null ? records.hashCode() : 0);
+return result;
+}
+}
+
+private final Time time;
+
+private final Map> timeoutMap = new 
HashMap<>();
+private final PriorityQueue> timeoutQueue = new 
PriorityQueue<>(
+Comparator.comparingLong(entry -> entry.deadlineMs)
+);
+
+public MockCoordinatorTimer(Time time) {
+this.time = time;
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+boolean retry,
+TimeoutOperation operation
+) {
+cancel(key);
+
+long deadlineMs = time.milliseconds() + unit.toMillis(delay);
+ScheduledTimeout timeout = new ScheduledTimeout<>(key, deadlineMs, 
operation);
+timeoutQueue.add(timeout);
+timeoutMap.put(key, timeout);
+}
+
+@Override
+public void cancel(String key) {
+ScheduledTimeout timeout = timeoutMap.remove(key);
+if (timeout != null) {
+timeoutQueue.remove(timeout);
+}
+}
+
+public boolean contains(String key) {
+return timeoutMap.containsKey(key);
+}
+
+public ScheduledTimeout timeout(String key) {
+return timeoutMap.get(key);
+}
+
+public int size() {
+return timeoutMap.size();
+}
+
+public List> poll() {

Review Comment:
   So we don't expire timeouts until we call this poll?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258688221


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
&&
+member.memberEpoch() != expectedMemberEpoch) {

Review Comment:
   I just extended `testRevocationTimeoutLifecycle` to execute a stale timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258683353


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
&&
+member.memberEpoch() != expectedMemberEpoch) {

Review Comment:
   Do we have a test for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258682428


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {

Review Comment:
   That's correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258681834


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {

Review Comment:
   Oh I see. We only catch the exceptions in the other PR if they are thrown 
(not caught) here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15172) Allow exact mirroring of ACLs between clusters

2023-07-10 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-15172:
--

 Summary: Allow exact mirroring of ACLs between clusters
 Key: KAFKA-15172
 URL: https://issues.apache.org/jira/browse/KAFKA-15172
 Project: Kafka
  Issue Type: Task
  Components: mirrormaker
Reporter: Mickael Maison


When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ. The 
rationale to is prevent other clients to produce to remote topics. 

However in disaster recovery scenarios, where the target cluster is not used 
and just a "hot standby", it would be preferable to have exactly the same ACLs 
on both clusters to speed up failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258669748


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
&&
+member.memberEpoch() != expectedMemberEpoch) {

Review Comment:
   good catch. i should have used an OR in the condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] eziosudo commented on pull request #13982: KAFKA-15159: upgrade minor dependencies

2023-07-10 Thread via GitHub


eziosudo commented on PR #13982:
URL: https://github.com/apache/kafka/pull/13982#issuecomment-1629456542

   > Thank you for the changes. We need to update the corresponding entries in 
LICENSE file too:
   > 
   > 
https://github.com/apache/kafka/blob/d481163d55a1cea3206fa6e386b24ac84e17c3d7/LICENSE-binary#L244
   
   Thanks for the review. Already update LICENSE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258664011


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {

Review Comment:
   It is actually the other way around. We catch those exceptions because we 
don't want to retry them. If the member does not exist or if the group does not 
exist, it means that the timer is stale. This should normally not happen 
because the timers should be cancelled when a member is removed and a group 
cannot be deleted if not empty. The API to delete a group is not implemented 
yet but will come soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-10 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1258640264


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");

Review Comment:
   Should we tell users that this is because stdin will be used and we don't do 
any offset tracking in that case?



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset != null && !offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+}
+
+// Let the task validate the actual value for the offset position on 
startup

Review Comment:
   Any reason not to do this preemptively? We could at least validate that the 
value for the "position" key is non-null, is a numeric type, and is 
non-negative.



##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("invalid_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+connector.alterOffsets(sourceProperties, offsets);

Review 

[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258653960


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
&&
+member.memberEpoch() != expectedMemberEpoch) {

Review Comment:
   We continue to revoke even if the epoch is unexpected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258653960


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
&&
+member.memberEpoch() != expectedMemberEpoch) {

Review Comment:
   We continue to revoke even if the epoch is unexpected? Also do we want to 
return early here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258650248


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -700,17 +726,36 @@ private 
CoordinatorResult consumerGr
 String groupId,
 String memberId
 ) throws ApiException {
-List records = new ArrayList<>();
-
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
 ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
 
 log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
 
+List records = consumerGroupFenceMember(group, member);
+cancelConsumerGroupSessionTimeout(groupId, memberId);

Review Comment:
   would we also want to cancel the revocation timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258644974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {

Review Comment:
   based on the previous pr, we retry these exceptions. I can imagine that some 
metadata was slow to update or something and eventually it could succeed. Do we 
have any path forward though if this request was issued and the group/member is 
no longer there? I guess this relying on canceling the task when we call group 
leave for the member? Do we have a method also to remove groups?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-10 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1258644974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +765,116 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {

Review Comment:
   based on the previous pr, we retry these exceptions. I can imagine that some 
metadata was slow to update or something and eventually it could succeed. Do we 
have any path forward though if this request was issued and the group/member is 
no longer there? I guess this relying on canceling the task when we call group 
leave for the member?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15145:
--
Fix Version/s: 3.5.1

> AbstractWorkerSourceTask re-processes records filtered out by SMTs on 
> retriable exceptions
> --
>
> Key: KAFKA-15145
> URL: https://issues.apache.org/jira/browse/KAFKA-15145
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2, 3.5.0, 3.4.1
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
> Fix For: 3.6.0, 3.5.1
>
>
> If a RetriableException is thrown from an admin client or producer client 
> operation in 
> [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
>  the send operation is retried for the remaining records in the batch. There 
> is a bug in the logic for computing the remaining records in a batch which 
> causes records that are filtered out by the task's transformation chain to be 
> re-processed. This will also result in the SourceTask::commitRecord method 
> being called twice for the same record, which can cause certain types of 
> source connectors to fail. This bug seems to exist since when SMTs were first 
> introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask

2023-07-10 Thread via GitHub


C0urante merged PR #13955:
URL: https://github.com/apache/kafka/pull/13955


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-10 Thread via GitHub


C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1258611895


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }));
 }
 
-private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+// Visible for testing
+int updateTopicAcls(List bindings) {
+Set addBindings = new HashSet<>(bindings);
+addBindings.removeAll(knownTopicAclBindings);
+int newBindCount = addBindings.size();
+if (!addBindings.isEmpty()) {
+log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+targetAdminClient.createAcls(addBindings).values().forEach((k, v) 
-> v.whenComplete((x, e) -> {
+if (e != null) {
+log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
+}
+}));
+knownTopicAclBindings = new HashSet<>(bindings);

Review Comment:
   Hmmm... won't this cause issues if the call above to 
`targetAdminClient::createAcls` fails?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -683,4 +686,35 @@ private Optional validateProperty(String 
name, Map
 assertNotNull(result, "Connector should not have record null config 
value for '" + name + "' property");
 return Optional.of(result);
 }
+
+@Test
+public void testUpdateIncrementTopicAcls() {
+Admin sourceAdmin = mock(Admin.class);
+Admin targetAdmin = mock(Admin.class);
+MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+List filteredBindings = new ArrayList<>();
+AclBinding binding1 = mock(AclBinding.class);
+AclBinding binding2 = mock(AclBinding.class);
+filteredBindings.add(binding1);
+filteredBindings.add(binding2);
+
doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet());
+
+// First topic acl info update when starting `syncTopicAcls` thread
+int newAddCount = connector.updateTopicAcls(filteredBindings);
+assertEquals(connector.knownTopicAclBindings(), new 
HashSet<>(filteredBindings));
+assertTrue(newAddCount == filteredBindings.size());
+
+List newAddBindings = new ArrayList<>();
+AclBinding binding3 = mock(AclBinding.class);
+AclBinding binding4 = mock(AclBinding.class);
+newAddBindings.add(binding3);
+newAddBindings.add(binding4);
+filteredBindings.addAll(newAddBindings);
+
+// The next increment topic acl info update
+newAddCount = connector.updateTopicAcls(filteredBindings);
+assertEquals(connector.knownTopicAclBindings(), new 
HashSet<>(filteredBindings));
+assertTrue(newAddCount == newAddBindings.size());

Review Comment:
   ```suggestion
   assertEquals(newAddBindings.size(), newAddCount);
   ```



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -683,4 +686,35 @@ private Optional validateProperty(String 
name, Map
 assertNotNull(result, "Connector should not have record null config 
value for '" + name + "' property");
 return Optional.of(result);
 }
+
+@Test
+public void testUpdateIncrementTopicAcls() {
+Admin sourceAdmin = mock(Admin.class);
+Admin targetAdmin = mock(Admin.class);
+MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+List filteredBindings = new ArrayList<>();
+AclBinding binding1 = mock(AclBinding.class);
+AclBinding binding2 = mock(AclBinding.class);
+filteredBindings.add(binding1);
+filteredBindings.add(binding2);
+
doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet());
+
+// First topic acl info update when starting `syncTopicAcls` thread
+int newAddCount = connector.updateTopicAcls(filteredBindings);
+assertEquals(connector.knownTopicAclBindings(), new 
HashSet<>(filteredBindings));
+assertTrue(newAddCount == filteredBindings.size());

Review Comment:
   ```suggestion
   assertEquals(filteredBindings.size(), newAddCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this 

[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1629387962

   > Thanks @vamossagar12, this is looking much better now!
   
   Thanks Yash.. I addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258617166


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   Well the rationale it the same that EOS waits indefinitely for the commits 
to happen. In this case, the tombstone containing offset writes happen to fall 
in the critical path and hence the commit transaction needs to wait for this as 
well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258614182


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -233,4 +496,10 @@ private void expectStore(Map key, byte[] 
keySerialized,
 });
 }
 
+private void extractKeyValue(Map key, byte[] 
keySerialized, Map value, byte[] valueSerialized) {

Review Comment:
   hmm I updated it. As such the effect on the overall code-flow of the method 
was extracting keys and values and hence the name. Anyways, made the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258613193


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing offsets to 
secondary store interrupted, cancelling", this);

Review Comment:
   I changed the only occurrences of `offsets with tombstones` to 
`tombstone(s)-containing offsets` and `secondary storage` to `secondary store`.
   Regarding `primary` and `secondary` store writes, that nomenclature was 
already being used 
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L285-L301)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258609819


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   It doesn't need to be, but it's cleaner that way. Otherwise we will need to 
make explicit null checks. This pattern is used at other places as well like 
[here](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484).
 I have retained the `AtomicReference` usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258609819


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   It doesn't need to be, but it's cleaner that way. Otherwise we will need to 
make explicit null checks. This pattern is used at other places as well like 
[here](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258608091


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask

2023-07-10 Thread via GitHub


yashmayya commented on PR #13955:
URL: https://github.com/apache/kafka/pull/13955#issuecomment-1629367616

   Thanks Chris, I agree with the point you brought up regarding potential 
poison pill messages. This PR can be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13946: KAFKA-15139:Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`

2023-07-10 Thread via GitHub


C0urante commented on PR #13946:
URL: https://github.com/apache/kafka/pull/13946#issuecomment-1629366713

   @hudeqi I think @gharris1727 can merge this now :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1258603343


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -651,6 +652,40 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 verifyTopicCreation();
 }
 
+@Test
+public void testSendRecordsRetriableException() {
+createWorkerTask();
+
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+expectTaskGetTopic();
+
+when(transformationChain.apply(eq(record1))).thenReturn(null);
+when(transformationChain.apply(eq(record2))).thenReturn(null);
+when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+when(producer.send(any(), any())).thenThrow(new 
RetriableException("Retriable exception")).thenReturn(null);
+
+workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+// The producer throws a RetriableException the first time we try to 
send the third record
+assertFalse(workerTask.sendRecords());
+
+// The next attempt to send the third record should succeed
+assertTrue(workerTask.sendRecords());
+
+// Ensure that the first two records that were filtered out by the 
transformation chain
+// aren't re-processed when we retry the call to sendRecords()
+verify(transformationChain, times(4)).apply(any(SourceRecord.class));

Review Comment:
   Ah, that's a good point, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14059) Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest

2023-07-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14059:
--
Fix Version/s: 3.6.0

> Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest
> ---
>
> Key: KAFKA-14059
> URL: https://issues.apache.org/jira/browse/KAFKA-14059
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14059) Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest

2023-07-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14059.
---
Resolution: Done

> Replace EasyMock and PowerMock with Mockito in WorkerSourceTaskTest
> ---
>
> Key: KAFKA-14059
> URL: https://issues.apache.org/jira/browse/KAFKA-14059
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-07-10 Thread via GitHub


C0urante merged PR #13383:
URL: https://github.com/apache/kafka/pull/13383


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-07-10 Thread via GitHub


C0urante merged PR #13284:
URL: https://github.com/apache/kafka/pull/13284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #13661: Doc fixes: Fix format and other small errors in config documentation

2023-07-10 Thread via GitHub


bbejeck commented on PR #13661:
URL: https://github.com/apache/kafka/pull/13661#issuecomment-1629346978

   Merged #13661 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #13661: Doc fixes: Fix format and other small errors in config documentation

2023-07-10 Thread via GitHub


bbejeck merged PR #13661:
URL: https://github.com/apache/kafka/pull/13661


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13941: KAFKA-15123: Add tests for ChunkedBytesStream

2023-07-10 Thread via GitHub


divijvaidya commented on code in PR #13941:
URL: https://github.com/apache/kafka/pull/13941#discussion_r1258587583


##
clients/src/test/java/org/apache/kafka/common/utils/ChunkedBytesStreamTest.java:
##
@@ -149,6 +160,38 @@ public void testCorrectnessForMethodSkip(int 
bytesToPreRead, ByteBuffer inputBuf
 }
 }
 
+@Test
+public void testEdgeCaseInputForMethodSkip() throws IOException {
+int bufferLength = 16;
+ByteBuffer inputBuf = ByteBuffer.allocate(bufferLength);
+RANDOM.nextBytes(inputBuf.array());
+inputBuf.position(inputBuf.capacity());
+inputBuf.flip();
+
+try (InputStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuf.duplicate()), supplier, 10, true)) {

Review Comment:
   you can use `@ParameterizedTest` for true/false here instead of duplicating 
code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #13661: Doc fixes: Fix format and other small errors in config documentation

2023-07-10 Thread via GitHub


bbejeck commented on PR #13661:
URL: https://github.com/apache/kafka/pull/13661#issuecomment-1629339267

   Failures unrelated
   
   ```
   JDK 8 and Scala 2.12 / testUnregisterBroker() – 
org.apache.kafka.controller.QuorumControllerTest
   JDK 8 and Scala 2.12 / testConfigurationOperations() – 
org.apache.kafka.controller.QuorumControllerTest
   JDK 17 and Scala 2.13 / testMultiNodeCluster() – 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest
   JDK 17 and Scala 2.13 / testConnectorBoundary – 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   JDK 11 and Scala 2.13 / testOutdatedCoordinatorAssignment() – 
org.apache.kafka.clients.consumer.internals.CooperativeConsumerCoordinatorTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >