This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9db5ed00a83 KAFKA-16726: Added share.auto.offset.reset dynamic config
for share groups (#17573)
9db5ed00a83 is described below
commit 9db5ed00a8369d5c696e836661230110ea2ea44d
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Nov 11 14:36:11 2024 +0530
KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups
(#17573)
This PR adds another dynamic config share.auto.offset.reset fir share
groups.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>, Abhinav Dixit <[email protected]>, Manikumar
Reddy <[email protected]>
---
.../java/kafka/server/share/ShareFetchUtils.java | 24 +-
.../java/kafka/server/share/SharePartition.java | 31 +-
.../server/share/SharePartitionManagerTest.java | 15 +
.../kafka/server/share/SharePartitionTest.java | 267 +++++++++++++++-
.../java/kafka/test/api/ShareConsumerTest.java | 244 +++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 5 +-
.../server/ShareFetchAcknowledgeRequestTest.scala | 354 +++++++++++++--------
.../kafka/coordinator/group/GroupConfig.java | 42 ++-
.../kafka/coordinator/group/GroupConfigTest.java | 57 +++-
.../coordinator/share/ShareCoordinatorShard.java | 2 +-
.../share/persister/NoOpShareStatePersister.java | 4 +-
.../server/share/persister/PartitionFactory.java | 8 +-
12 files changed, 889 insertions(+), 164 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index bd504deac49..3515362152b 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -19,9 +19,11 @@ package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.ReplicaManager;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
@@ -40,6 +42,7 @@ import java.util.Map;
import java.util.Optional;
import scala.Option;
+import scala.Some;
/**
* Utility class for post-processing of share fetch operations.
@@ -125,7 +128,26 @@ public class ShareFetchUtils {
Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(),
ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true).timestampAndOffsetOpt();
- return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
+ if (timestampAndOffset.isEmpty()) {
+ throw new OffsetNotAvailableException("offset for Earliest
timestamp not found for topic partition: " + topicIdPartition);
+ }
+ return timestampAndOffset.get().offset;
+ }
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
+ // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests
+ Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
+ topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
+ Optional.empty(), true).timestampAndOffsetOpt();
+ if (timestampAndOffset.isEmpty()) {
+ throw new OffsetNotAvailableException("offset for Latest timestamp
not found for topic partition: " + topicIdPartition);
+ }
+ return timestampAndOffset.get().offset;
}
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 740bf697de4..71baea10174 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -72,6 +73,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
+import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
+
/**
* The SharePartition is used to track the state of a partition that is shared
between multiple
* consumers. The class maintains the state of the records that have been
fetched from the leader
@@ -421,8 +425,12 @@ public class SharePartition {
return;
}
- // Set the state epoch and end offset from the persisted state.
- startOffset = partitionData.startOffset() != -1 ?
partitionData.startOffset() : 0;
+ try {
+ startOffset =
startOffsetDuringInitialization(partitionData.startOffset());
+ } catch (Exception e) {
+ completeInitializationWithException(future, e);
+ return;
+ }
stateEpoch = partitionData.stateEpoch();
List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
@@ -448,7 +456,7 @@ public class SharePartition {
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
-
updateEndOffsetAndResetFetchOffsetMetadata(partitionData.startOffset());
+ updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
@@ -2058,6 +2066,23 @@ public class SharePartition {
}
}
+ private long startOffsetDuringInitialization(long
partitionDataStartOffset) throws Exception {
+ // Set the state epoch and end offset from the persisted state.
+ if (partitionDataStartOffset !=
PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ return partitionDataStartOffset;
+ }
+ GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
+ if (groupConfigManager.groupConfig(groupId).isPresent()) {
+ offsetResetStrategy =
groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
+ } else {
+ offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
+ }
+
+ if (offsetResetStrategy ==
GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
+ return offsetForEarliestTimestamp(topicIdPartition,
replicaManager);
+ return offsetForLatestTimestamp(topicIdPartition, replicaManager);
+ }
+
// Visible for testing. Should only be used for testing purposes.
NavigableMap<Long, InFlightBatch> cachedState() {
return new ConcurrentSkipListMap<>(cachedState);
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 67c2a6cce77..46abf04b0a6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -17,6 +17,7 @@
package kafka.server.share;
import kafka.cluster.Partition;
+import kafka.log.OffsetResultHolder;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchRequest;
@@ -1040,6 +1042,8 @@ public class SharePartitionManagerTest {
partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
+ mockFetchOffsetForTimestamp(mockReplicaManager);
+
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
Metrics metrics = new Metrics();
@@ -1109,6 +1113,9 @@ public class SharePartitionManagerTest {
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+
+ mockFetchOffsetForTimestamp(mockReplicaManager);
+
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@@ -1233,6 +1240,8 @@ public class SharePartitionManagerTest {
TopicIdPartition tp0 = new TopicIdPartition(fooId, new
TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+ mockFetchOffsetForTimestamp(mockReplicaManager);
+
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@@ -2482,6 +2491,12 @@ public class SharePartitionManagerTest {
});
}
+ private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+ }
+
static Seq<Tuple2<TopicIdPartition, LogReadResult>>
buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new
ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new
Tuple2<>(topicIdPartition, new LogReadResult(
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 6538a1f4012..3e90005902e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -16,12 +16,14 @@
*/
package kafka.server.share;
+import kafka.log.OffsetResultHolder;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState;
import kafka.server.share.SharePartition.SharePartitionState;
import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@@ -34,9 +36,11 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
@@ -77,6 +81,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -191,6 +197,244 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(11L).offsetState());
}
+ @Test
+ public void
testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ // replicaManager.fetchOffsetForTimestamp should be called with
"ListOffsetsRequest.EARLIEST_TIMESTAMP"
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(0, sharePartition.startOffset());
+ assertEquals(0, sharePartition.endOffset());
+ assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH,
sharePartition.stateEpoch());
+ }
+
+ @Test
+ public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest()
{
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ // replicaManager.fetchOffsetForTimestamp should be called with
"ListOffsetsRequest.LATEST_TIMESTAMP"
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(15, sharePartition.startOffset());
+ assertEquals(15, sharePartition.endOffset());
+ assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH,
sharePartition.stateEpoch());
+ }
+
+ @Test
+ public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ // replicaManager.fetchOffsetForTimestamp should be called with
"ListOffsetsRequest.LATEST_TIMESTAMP"
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(15, sharePartition.startOffset());
+ assertEquals(15, sharePartition.endOffset());
+ assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH,
sharePartition.stateEpoch());
+ }
+
+ @Test
+ public void testMaybeInitializeFetchOffsetForLatestTimestampThrowsError() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
+ .thenThrow(new RuntimeException("fetch offsets exception"));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+
+ // replicaManager.fetchOffsetForTimestamp should be called with
"ListOffsetsRequest.LATEST_TIMESTAMP"
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+ }
+
+ @Test
+ public void
testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(
+ 0, PartitionFactory.DEFAULT_STATE_EPOCH,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_ERROR_CODE,
+ PartitionFactory.DEFAULT_ERR_MESSAGE,
+ Collections.emptyList())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
+
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
+ .thenThrow(new RuntimeException("fetch offsets exception"));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withGroupConfigManager(groupConfigManager)
+ .withReplicaManager(replicaManager)
+ .build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+
+ // replicaManager.fetchOffsetForTimestamp should be called with
"ListOffsetsRequest.EARLIEST_TIMESTAMP"
+ Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+ Mockito.any(TopicPartition.class),
+ Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean()
+ );
+
+ assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+ }
+
@Test
public void testMaybeInitializeSharePartitionAgain() {
Persister persister = Mockito.mock(Persister.class);
@@ -460,7 +704,13 @@ public class SharePartitionTest {
@Test
public void testMaybeInitializeWithNoOpShareStatePersister() {
- SharePartition sharePartition =
SharePartitionBuilder.builder().build();
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
@@ -825,7 +1075,13 @@ public class SharePartitionTest {
@Test
public void testMaybeAcquireAndReleaseFetchLock() {
- SharePartition sharePartition =
SharePartitionBuilder.builder().build();
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+ FileRecords.TimestampAndOffset timestampAndOffset = new
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+ Mockito.doReturn(new
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
sharePartition.maybeInitialize();
assertTrue(sharePartition.maybeAcquireFetchLock());
// Lock cannot be acquired again, as already acquired.
@@ -5233,7 +5489,7 @@ public class SharePartitionTest {
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
private Persister persister = new NoOpShareStatePersister();
- private final ReplicaManager replicaManager =
Mockito.mock(ReplicaManager.class);
+ private ReplicaManager replicaManager =
Mockito.mock(ReplicaManager.class);
private GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
private SharePartitionState state = SharePartitionState.EMPTY;
@@ -5257,6 +5513,11 @@ public class SharePartitionTest {
return this;
}
+ private SharePartitionBuilder withReplicaManager(ReplicaManager
replicaManager) {
+ this.replicaManager = replicaManager;
+ return this;
+ }
+
private SharePartitionBuilder
withGroupConfigManager(GroupConfigManager groupConfigManager) {
this.groupConfigManager = groupConfigManager;
return this;
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index ade43e899b4..b7d127eb429 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -19,6 +19,9 @@ package kafka.test.api;
import kafka.api.BaseConsumerTest;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.AcknowledgeType;
@@ -34,6 +37,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
@@ -47,6 +51,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -60,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -101,6 +107,8 @@ public class ShareConsumerTest {
private static final String DEFAULT_STATE_PERSISTER =
"org.apache.kafka.server.share.persister.DefaultStatePersister";
private static final String NO_OP_PERSISTER =
"org.apache.kafka.server.share.persister.NoOpShareStatePersister";
+ private Admin adminClient;
+
@BeforeEach
public void createCluster(TestInfo testInfo) throws Exception {
String persisterClassName = NO_OP_PERSISTER;
@@ -131,11 +139,13 @@ public class ShareConsumerTest {
cluster.waitForReadyBrokers();
createTopic("topic");
createTopic("topic2");
+ adminClient = createAdminClient();
warmup();
}
@AfterEach
public void destroyCluster() throws Exception {
+ adminClient.close();
cluster.close();
}
@@ -156,6 +166,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.close();
assertEquals(0, records.count());
@@ -168,6 +179,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.unsubscribe();
assertEquals(Collections.emptySet(), shareConsumer.subscription());
@@ -182,6 +194,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(500));
assertEquals(0, records.count());
shareConsumer.subscribe(subscription);
@@ -198,6 +211,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.unsubscribe();
assertEquals(Collections.emptySet(), shareConsumer.subscription());
@@ -214,6 +228,7 @@ public class ShareConsumerTest {
Set<String> subscription = Collections.singleton(tp.topic());
shareConsumer.subscribe(subscription);
assertEquals(subscription, shareConsumer.subscription());
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(500));
shareConsumer.subscribe(Collections.emptySet());
assertEquals(Collections.emptySet(), shareConsumer.subscription());
@@ -231,6 +246,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
shareConsumer.close();
@@ -245,6 +261,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
producer.send(record);
@@ -273,6 +290,8 @@ public class ShareConsumerTest {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@@ -307,6 +326,8 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
// Now in the second poll, we implicitly acknowledge the record
received in the first poll.
@@ -334,6 +355,8 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@@ -361,6 +384,8 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@@ -420,6 +445,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
List<ConsumerRecord<byte[], byte[]>> records =
consumeRecords(shareConsumer, numRecords);
assertEquals(numRecords, records.size());
@@ -442,6 +468,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
List<ConsumerRecord<byte[], byte[]>> records =
consumeRecords(shareConsumer, numRecords);
assertEquals(numRecords, records.size());
@@ -468,6 +495,8 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1",
Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
String.valueOf(maxPollRecords)));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
List<ConsumerRecord<byte[], byte[]>> records =
consumeRecords(shareConsumer, numRecords);
long i = 0L;
for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -513,6 +542,8 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(4, records.count());
assertEquals(transactional1.offset(),
records.records(tp).get(0).offset());
@@ -538,6 +569,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
@@ -556,6 +588,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
@@ -583,6 +616,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
@@ -634,6 +668,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
@@ -689,6 +724,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -709,6 +745,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -727,6 +764,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -734,7 +772,6 @@ public class ShareConsumerTest {
producer.close();
}
-
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testExplicitAcknowledgeThrowsNotInBatch(String persister) {
@@ -743,6 +780,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord =
records.records(tp).get(0);
@@ -762,6 +800,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord =
records.records(tp).get(0);
@@ -780,6 +819,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
@@ -805,6 +845,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
@@ -843,6 +884,8 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1",
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
String.valueOf(maxPartitionFetchBytes)));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
+
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
shareConsumer.close();
@@ -857,9 +900,11 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group2");
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group2", "earliest");
// producing 3 records to the topic
producer.send(record);
@@ -907,6 +952,7 @@ public class ShareConsumerTest {
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
int totalMessages = 2000;
for (int i = 0; i < totalMessages; i++) {
@@ -943,9 +989,16 @@ public class ShareConsumerTest {
int producerCount = 4;
int messagesPerProducer = 5000;
+ String groupId = "group1";
+
ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
ExecutorService consumerExecutorService =
Executors.newFixedThreadPool(consumerCount);
+ // This consumer is created to register the share group id with the
groupCoordinator
+ // so that the config share.auto.offset.reset can be altered for this
group
+ createShareConsumer(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), groupId);
+ alterShareAutoOffsetReset(groupId, "earliest");
+
for (int i = 0; i < producerCount; i++) {
producerExecutorService.submit(() ->
produceMessages(messagesPerProducer));
}
@@ -957,7 +1010,7 @@ public class ShareConsumerTest {
consumerExecutorService.submit(() -> {
CompletableFuture<Integer> future = new CompletableFuture<>();
futures.add(future);
- consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, "group1", consumerNumber, 30, true, future,
Optional.of(maxBytes));
+ consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, groupId, consumerNumber, 30, true, future,
Optional.of(maxBytes));
});
}
@@ -990,6 +1043,19 @@ public class ShareConsumerTest {
int messagesPerProducer = 2000;
final int totalMessagesSent = producerCount * messagesPerProducer;
+ String groupId1 = "group1";
+ String groupId2 = "group2";
+ String groupId3 = "group3";
+
+ // These consumers are created to register the share group ids with
the groupCoordinator
+ // so that the config share.auto.offset.reset can be altered for these
groups
+ createShareConsumer(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), groupId1);
+ alterShareAutoOffsetReset(groupId1, "earliest");
+ createShareConsumer(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), groupId2);
+ alterShareAutoOffsetReset(groupId2, "earliest");
+ createShareConsumer(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), groupId3);
+ alterShareAutoOffsetReset(groupId3, "earliest");
+
ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
ExecutorService shareGroupExecutorService1 =
Executors.newFixedThreadPool(consumerCount);
ExecutorService shareGroupExecutorService2 =
Executors.newFixedThreadPool(consumerCount);
@@ -1095,6 +1161,7 @@ public class ShareConsumerTest {
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
KafkaShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
int totalMessages = 1500;
for (int i = 0; i < totalMessages; i++) {
@@ -1139,6 +1206,13 @@ public class ShareConsumerTest {
int producerCount = 4;
int messagesPerProducer = 5000;
+ String groupId = "group1";
+
+ // This consumer is created to register the share group id with the
groupCoordinator
+ // so that the config share.auto.offset.reset can be altered for this
group
+ createShareConsumer(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), groupId);
+ alterShareAutoOffsetReset(groupId, "earliest");
+
ExecutorService consumerExecutorService =
Executors.newFixedThreadPool(consumerCount);
ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
@@ -1157,7 +1231,7 @@ public class ShareConsumerTest {
// The "failing" consumer polls but immediately closes, which
releases the records for the other consumers
CompletableFuture<Integer> future = new CompletableFuture<>();
AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
- consumeMessages(failedMessagesConsumed, producerCount *
messagesPerProducer, "group1", 0, 1, false, future);
+ consumeMessages(failedMessagesConsumed, producerCount *
messagesPerProducer, groupId, 0, 1, false, future);
startSignal.countDown();
});
@@ -1174,7 +1248,7 @@ public class ShareConsumerTest {
consumerExecutorService.submit(() -> {
CompletableFuture<Integer> future = new CompletableFuture<>();
futuresSuccess.add(future);
- consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, "group1", consumerNumber, 40, true, future,
Optional.of(maxBytes));
+ consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, groupId, consumerNumber, 40, true, future,
Optional.of(maxBytes));
});
}
producerExecutorService.shutdown();
@@ -1203,6 +1277,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
producer.send(producerRecord1);
@@ -1251,6 +1326,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ alterShareAutoOffsetReset("group1", "earliest");
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1292,6 +1368,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ alterShareAutoOffsetReset("group1", "earliest");
// The acknowledgment commit callback will try to call a method of
KafkaShareConsumer
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
@@ -1331,6 +1408,7 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ alterShareAutoOffsetReset("group1", "earliest");
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallbackThrows<>());
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1363,6 +1441,7 @@ public class ShareConsumerTest {
public void testPollThrowsInterruptExceptionIfInterrupted(String
persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
// interrupt the thread and call poll
try {
@@ -1386,6 +1465,7 @@ public class ShareConsumerTest {
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String
persister) {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton("topic abc"));
+ alterShareAutoOffsetReset("group1", "earliest");
// The exception depends upon a metadata response which arrives
asynchronously. If the delay is
// too short, the poll might return before the error is known.
@@ -1405,6 +1485,7 @@ public class ShareConsumerTest {
producer.send(record);
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ alterShareAutoOffsetReset("group1", "earliest");
shareConsumer.wakeup();
assertThrows(WakeupException.class, () ->
shareConsumer.poll(Duration.ZERO));
@@ -1423,6 +1504,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
String topic = "foo";
shareConsumer.subscribe(Collections.singleton(topic));
+ alterShareAutoOffsetReset("group1", "earliest");
// Topic is created post creation of share consumer and subscription
createTopic(topic);
@@ -1458,6 +1540,7 @@ public class ShareConsumerTest {
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
// Consumer subscribes to the topics -> bar and baz.
shareConsumer.subscribe(Arrays.asList(topic1, topic2));
+ alterShareAutoOffsetReset("group1", "earliest");
producer.send(recordTopic1).get();
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -1490,6 +1573,13 @@ public class ShareConsumerTest {
KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
+ String groupId = "group1";
+
+ // This consumer is created to register the share group id with the
groupCoordinator
+ // so that the config share.auto.offset.reset can be altered for this
group
+ createShareConsumer(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), groupId);
+ alterShareAutoOffsetReset(groupId, "earliest");
+
// We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
try {
for (int i = 0; i < 10; i++) {
@@ -1499,13 +1589,12 @@ public class ShareConsumerTest {
fail("Failed to send records: " + e);
}
- Admin adminClient = createAdminClient();
// We delete records before offset 5, so the LSO should move to 5.
adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(5L)));
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
CompletableFuture<Integer> future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true,
future);
+ consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true,
future);
// The records returned belong to offsets 5-9.
assertEquals(5, totalMessagesConsumed.get());
try {
@@ -1528,7 +1617,7 @@ public class ShareConsumerTest {
totalMessagesConsumed = new AtomicInteger(0);
future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 1, "group1", 1, 10, true,
future);
+ consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true,
future);
// The record returned belong to offset 14.
assertEquals(1, totalMessagesConsumed.get());
try {
@@ -1542,14 +1631,135 @@ public class ShareConsumerTest {
totalMessagesConsumed = new AtomicInteger(0);
future = new CompletableFuture<>();
- consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true,
future);
+ consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future);
assertEquals(0, totalMessagesConsumed.get());
try {
assertEquals(0, future.get());
} catch (Exception e) {
fail("Exception occurred : " + e.getMessage());
}
- adminClient.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testShareAutoOffsetResetDefaultValue(String persister) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // Producing a record.
+ producer.send(record);
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ // No records should be consumed because share.auto.offset.reset has a
default of "latest". Since the record
+ // was produced before share partition was initialized (which happens
after the first share fetch request
+ // in the poll method), the start offset would be the latest offset,
i.e. 1 (the next offset after the already
+ // present 0th record)
+ assertEquals(0, records.count());
+ // Producing another record.
+ producer.send(record);
+ records = shareConsumer.poll(Duration.ofMillis(5000));
+ // Now the next record should be consumed successfully
+ assertEquals(1, records.count());
+ shareConsumer.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testShareAutoOffsetResetEarliest(String persister) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ // Changing the value of share.auto.offset.reset value to "earliest"
+ alterShareAutoOffsetReset("group1", "earliest");
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // Producing a record.
+ producer.send(record);
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ // Since the value for share.auto.offset.reset has been altered to
"earliest", the consumer should consume
+ // all messages present on the partition
+ assertEquals(1, records.count());
+ // Producing another record.
+ producer.send(record);
+ records = shareConsumer.poll(Duration.ofMillis(5000));
+ // The next records should also be consumed successfully
+ assertEquals(1, records.count());
+ shareConsumer.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String
persister) throws Exception {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+ // Changing the value of share.auto.offset.reset value to "earliest"
+ alterShareAutoOffsetReset("group1", "earliest");
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // We write 10 records to the topic, so they would be written from
offsets 0-9 on the topic.
+ try {
+ for (int i = 0; i < 10; i++) {
+ producer.send(record).get();
+ }
+ } catch (Exception e) {
+ fail("Failed to send records: " + e);
+ }
+
+ // We delete records before offset 5, so the LSO should move to 5.
+ adminClient.deleteRecords(Collections.singletonMap(tp,
RecordsToDelete.beforeOffset(5L)));
+
+ AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
+ CompletableFuture<Integer> future = new CompletableFuture<>();
+ consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true,
future);
+ // The records returned belong to offsets 5-9.
+ assertEquals(5, totalMessagesConsumed.get());
+ assertEquals(5, future.get());
+
+ shareConsumer.close();
+ producer.close();
+ }
+
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void
testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1");
+ shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
+ // Changing the value of share.auto.offset.reset value to "earliest"
for group1
+ alterShareAutoOffsetReset("group1", "earliest");
+
+ KafkaShareConsumer<byte[], byte[]> shareConsumerLatest =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group2");
+ shareConsumerLatest.subscribe(Collections.singleton(tp.topic()));
+ // Changing the value of share.auto.offset.reset value to "latest" for
group2
+ alterShareAutoOffsetReset("group2", "latest");
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ // Producing a record.
+ producer.send(record);
+ ConsumerRecords<byte[], byte[]> records1 =
shareConsumerEarliest.poll(Duration.ofMillis(5000));
+ // Since the value for share.auto.offset.reset has been altered to
"earliest", the consumer should consume
+ // all messages present on the partition
+ assertEquals(1, records1.count());
+
+ ConsumerRecords<byte[], byte[]> records2 =
shareConsumerLatest.poll(Duration.ofMillis(5000));
+ // Since the value for share.auto.offset.reset has been altered to
"latest", the consumer should not consume
+ // any message
+ assertEquals(0, records2.count());
+
+ // Producing another record.
+ producer.send(record);
+
+ records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
+ // The next record should also be consumed successfully by group1
+ assertEquals(1, records1.count());
+
+ records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
+ // The next record should also be consumed successfully by group2
+ assertEquals(1, records2.count());
+
+ shareConsumerEarliest.close();
+ shareConsumerLatest.close();
producer.close();
}
@@ -1729,6 +1939,7 @@ public class ShareConsumerTest {
try {
producer.send(record).get(15000, TimeUnit.MILLISECONDS);
shareConsumer.subscribe(subscription);
+ alterShareAutoOffsetReset("warmupgroup1", "earliest");
TestUtils.waitForCondition(
() -> shareConsumer.poll(Duration.ofMillis(5000)).count() ==
1, 30000, 200L, () -> "warmup record not received");
} finally {
@@ -1736,4 +1947,19 @@ public class ShareConsumerTest {
shareConsumer.close();
}
}
+
+ private void alterShareAutoOffsetReset(String groupId, String newValue) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
+ Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
+ alterEntries.put(configResource, List.of(new AlterConfigOp(new
ConfigEntry(
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue),
AlterConfigOp.OpType.SET)));
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+ try {
+ adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+ .all()
+ .get(60, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ fail("Exception was thrown: ", e);
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7b78793373b..2e6ffdf400b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -74,9 +74,9 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG}
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
-import org.apache.kafka.coordinator.group.{GroupCoordinator,
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator,
GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorConfigTest}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
@@ -585,6 +585,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
+ cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.defaultShareAutoOffsetReset.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new
DescribeConfigsRequestData()
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index cdc8cf5dd75..8097021e4cb 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -170,14 +170,17 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -235,16 +238,19 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicIdPartition2 = new TopicIdPartition(topicId, new
TopicPartition(topic, 1))
val topicIdPartition3 = new TopicIdPartition(topicId, new
TopicPartition(topic, 2))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition1,
topicIdPartition2, topicIdPartition3)
+
+ // Send the first share fetch request to initialize the share partitions
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic partitions created above
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
produceData(topicIdPartition3, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition1,
topicIdPartition2, topicIdPartition3)
-
- // Send the share fetch request to fetch the records produced above
- val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -325,12 +331,6 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val leader2 = partitionToLeaders(topicIdPartition2)
val leader3 = partitionToLeaders(topicIdPartition3)
- initProducer()
- // Producing 10 records to the topic partitions created above
- produceData(topicIdPartition1, 10)
- produceData(topicIdPartition2, 10)
- produceData(topicIdPartition3, 10)
-
val send1: Seq[TopicIdPartition] = Seq(topicIdPartition1)
val send2: Seq[TopicIdPartition] = Seq(topicIdPartition2)
val send3: Seq[TopicIdPartition] = Seq(topicIdPartition3)
@@ -338,14 +338,31 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
- // Crete different share fetch requests for different partitions as they
may have leaders on separate brokers
- val shareFetchRequest1 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
- val shareFetchRequest2 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
- val shareFetchRequest3 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
+ // Send the first share fetch request to initialize the share partitions
+ // Create different share fetch requests for different partitions as they
may have leaders on separate brokers
+ var shareFetchRequest1 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
+ var shareFetchRequest2 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
+ var shareFetchRequest3 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
+
+ var shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
+ var shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
+ var shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
+
+ initProducer()
+ // Producing 10 records to the topic partitions created above
+ produceData(topicIdPartition1, 10)
+ produceData(topicIdPartition2, 10)
+ produceData(topicIdPartition3, 10)
+
+ // Send the second share fetch request to fetch the records produced above
+ // Create different share fetch requests for different partitions as they
may have leaders on separate brokers
+ shareFetchRequest1 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
+ shareFetchRequest2 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
+ shareFetchRequest3 = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
- val shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
- val shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
- val shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
+ shareFetchResponse1 =
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
+ shareFetchResponse2 =
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
+ shareFetchResponse3 =
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
@@ -427,15 +444,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize share partitions
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -482,7 +502,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
- // Sending a second share fetch request to check if acknowledgements were
done successfully
+ // Sending a third share fetch request to check if acknowledgements were
done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -540,15 +560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch: Int =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -571,7 +594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
- // Send a Share Fetch request with piggybacked acknowledgements
+ // Send the third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
@@ -599,7 +622,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
- // Sending a third share fetch request to confirm if acknowledgements were
done successfully
+ // Sending a fourth share fetch request to confirm if acknowledgements
were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -657,15 +680,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partiion
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -709,7 +735,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val acknowledgePartitionData =
shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData,
acknowledgePartitionData)
- // Sending a second share fetch request to check if acknowledgements were
done successfully
+ // Sending a third share fetch request to check if acknowledgements were
done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -767,15 +793,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -798,7 +827,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
- // Send a Share Fetch request with piggybacked acknowledgements
+ // Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
@@ -862,15 +891,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -917,7 +949,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
- // Sending a second share fetch request to check if acknowledgements were
done successfully
+ // Sending a third share fetch request to check if acknowledgements were
done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -975,15 +1007,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1006,7 +1041,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
- // Send a Share Fetch request with piggybacked acknowledgements
+ // Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
@@ -1034,7 +1069,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic
produceData(topicIdPartition, 10)
- // Sending a third share fetch request to confirm if acknowledgements were
done successfully
+ // Sending a fourth share fetch request to confirm if acknowledgements
were done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -1094,15 +1129,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the shar partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1146,7 +1184,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
var acknowledgePartitionData =
shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData,
acknowledgePartitionData)
- // Sending a second share fetch request to check if acknowledgements were
done successfully
+ // Sending a third share fetch request to check if acknowledgements were
done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -1193,7 +1231,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 new records to the topic
produceData(topicIdPartition, 10)
- // Sending a third share fetch request to check if acknowledgements were
done successfully
+ // Sending a fourth share fetch request to check if acknowledgements were
done successfully
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -1251,6 +1289,11 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 3 large messages to the topic created above
produceData(topicIdPartition, 10)
@@ -1258,10 +1301,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
produceData(topicIdPartition, "large message 2", new String(new
Array[Byte](MAX_PARTITION_BYTES/3)))
produceData(topicIdPartition, "large message 3", new String(new
Array[Byte](MAX_PARTITION_BYTES/3)))
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ val metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1311,6 +1352,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
)
def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit =
{
val groupId: String = "group"
+
+ val memberId = Uuid.randomUuid()
val memberId1 = Uuid.randomUuid()
val memberId2 = Uuid.randomUuid()
val memberId3 = Uuid.randomUuid()
@@ -1323,12 +1366,15 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Sending a dummy share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10000 records to the topic created above
produceData(topicIdPartition, 10000)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
// Sending 3 share Fetch Requests with same groupId to the same
topicPartition but with different memberIds,
// mocking the behaviour of multiple share consumers from the same share
group
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1,
ShareRequestMetadata.INITIAL_EPOCH)
@@ -1418,23 +1464,28 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Sending 3 dummy share Fetch Requests with to inititlaize the share
partitions for each share group\
+ sendFirstShareFetchRequest(memberId1, groupId1, send)
+ sendFirstShareFetchRequest(memberId2, groupId2, send)
+ sendFirstShareFetchRequest(memberId3, groupId3, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Sending 3 share Fetch Requests with same groupId to the same
topicPartition but with different memberIds,
- // mocking the behaviour of multiple share consumers from the same share
group
- val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Sending 3 share Fetch Requests with different groupId and different
memberIds to the same topicPartition,
+ // mocking the behaviour of 3 different share groups
+ val metadata1 = new ShareRequestMetadata(memberId1,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap1: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1)
- val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2,
ShareRequestMetadata.INITIAL_EPOCH)
+ val metadata2 = new ShareRequestMetadata(memberId2,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap2: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2)
- val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3,
ShareRequestMetadata.INITIAL_EPOCH)
+ val metadata3 = new ShareRequestMetadata(memberId3,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap3: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3)
@@ -1509,15 +1560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1540,7 +1594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
- // Send a Share Fetch request with piggybacked acknowledgements
+ // Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
@@ -1616,15 +1670,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
shareSessionEpoch)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1647,7 +1704,7 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
- // Send a Share Fetch request with piggybacked acknowledgements
+ // Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
@@ -1804,12 +1861,28 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code,
shareAcknowledgeResponseData.errorCode)
}
- @ClusterTest(
- serverProperties = Array(
- new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer,share"),
- new ClusterConfigProperty(key = "group.share.enable", value = "true"),
- new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
- new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ @ClusterTests(
+ Array(
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ ),
+ new ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+ new ClusterConfigProperty(key = "group.share.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "group.share.persister.class.name",
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ new ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "1"),
+ new ClusterConfigProperty(key = "unstable.api.versions.enable",
value = "true")
+ )
+ ),
)
)
def testShareFetchRequestInvalidShareSessionEpoch(): Unit = {
@@ -1824,14 +1897,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1850,8 +1927,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val partitionData =
shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
- // Sending Share Fetch request with invalid share session epoch
- metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
+ // Sending a thord Share Fetch request with invalid share session epoch
+ shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
+ metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1895,14 +1973,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1922,7 +2004,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending Share Acknowledge request with invalid share session epoch
- metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
+ shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
+ metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
Map(topicIdPartition -> List(new
ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
@@ -1972,14 +2055,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1998,8 +2085,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val partitionData =
shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedPartitionData, partitionData)
- // Sending a Share Fetch request with wrong member Id
- metadata = new ShareRequestMetadata(wrongMemberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+ // Sending a third Share Fetch request with wrong member Id
+ shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+ metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -2044,14 +2132,18 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition(topic, partition))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic created above
produceData(topicIdPartition, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
- // Send the share fetch request to fetch the records produced above
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
val shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -2071,7 +2163,8 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
// Sending a Share Acknowledge request with wrong member Id
- metadata = new ShareRequestMetadata(wrongMemberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+ shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+ metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
Map(topicIdPartition -> List(new
ShareAcknowledgeRequestData.AcknowledgementBatch()
.setFirstOffset(0)
@@ -2122,15 +2215,19 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
val topicIdPartition1 = new TopicIdPartition(topicId, new
TopicPartition(topic, partition1))
val topicIdPartition2 = new TopicIdPartition(topicId, new
TopicPartition(topic, partition2))
+ val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
+
+ // Send the first share fetch request to initialize the share partition
+ sendFirstShareFetchRequest(memberId, groupId, send)
+
initProducer()
// Producing 10 records to the topic partitions created above
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
- val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
-
- // Send the share fetch request to fetch the records produced above
- var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ // Send the second share fetch request to fetch the records produced above
+ var shareSessionEpoch =
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+ var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMap: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
var shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -2145,8 +2242,9 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
produceData(topicIdPartition1, 10)
produceData(topicIdPartition2, 10)
- // Send the share fetch request to with forget list populated with
topicIdPartition2
- metadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+ // Send another share fetch request with forget list populated with
topicIdPartition2
+ shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+ metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -2167,6 +2265,12 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
compareFetchResponsePartitions(expectedPartitionData, partitionData)
}
+ private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String,
topicIdPartitions: Seq[TopicIdPartition]): Unit = {
+ val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId,
ShareRequestMetadata.INITIAL_EPOCH)
+ val shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
+ connectAndReceive[ShareFetchResponse](shareFetchRequest)
+ }
+
private def expectedAcquiredRecords(firstOffsets: util.List[Long],
lastOffsets: util.List[Long], deliveryCounts: util.List[Int]):
util.List[AcquiredRecords] = {
val acquiredRecordsList: util.List[AcquiredRecords] = new util.ArrayList()
for (i <- firstOffsets.indices) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index b65e297bb04..934055d9d5b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -21,8 +21,10 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -31,6 +33,8 @@ import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* Group configuration related parameters and supporting methods like
validation, etc. are
@@ -48,6 +52,10 @@ public final class GroupConfig extends AbstractConfig {
public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG =
"share.record.lock.duration.ms";
+ public static final String SHARE_AUTO_OFFSET_RESET_CONFIG =
"share.auto.offset.reset";
+ public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT =
ShareGroupAutoOffsetReset.LATEST.toString();
+ public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to
initialize the share-partition start offset.";
+
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
@@ -58,6 +66,8 @@ public final class GroupConfig extends AbstractConfig {
public final int shareRecordLockDurationMs;
+ public final String shareAutoOffsetReset;
+
private static final ConfigDef CONFIG = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
@@ -88,7 +98,13 @@ public final class GroupConfig extends AbstractConfig {
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT,
atLeast(1000),
MEDIUM,
- ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC);
+ ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
+ .define(SHARE_AUTO_OFFSET_RESET_CONFIG,
+ STRING,
+ SHARE_AUTO_OFFSET_RESET_DEFAULT,
+ in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
+ MEDIUM,
+ SHARE_AUTO_OFFSET_RESET_DOC);
public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
@@ -97,6 +113,7 @@ public final class GroupConfig extends AbstractConfig {
this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
this.shareHeartbeatIntervalMs =
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs =
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+ this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
}
public static ConfigDef configDef() {
@@ -203,6 +220,13 @@ public final class GroupConfig extends AbstractConfig {
return new GroupConfig(props);
}
+ /**
+ * The default share group auto offset reset strategy.
+ */
+ public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
+ return
ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
+ }
+
/**
* The consumer group session timeout in milliseconds.
*/
@@ -237,4 +261,20 @@ public final class GroupConfig extends AbstractConfig {
public int shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
+
+ /**
+ * The share group auto offset reset strategy.
+ */
+ public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+ return
ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
+ }
+
+ public enum ShareGroupAutoOffsetReset {
+ LATEST, EARLIEST;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 9a817a0a40d..fe11f50d2ff 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest;
@@ -27,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,8 +59,10 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
+ } else if
(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "hello", "1.0");
} else {
- assertPropertyInvalid(name, "not_a_number", "-1");
+ assertPropertyInvalid(name, "not_a_number", "-0.1");
}
});
}
@@ -71,6 +75,21 @@ public class GroupConfigTest {
}
}
+ @Test
+ public void testValidShareAutoOffsetResetValues() {
+
+ Properties props = createValidGroupConfig();
+
+ // Check for value "latest"
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+ doTestValidProps(props);
+ props = createValidGroupConfig();
+
+ // Check for value "earliest"
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
+ doTestValidProps(props);
+ }
+
@Test
public void testInvalidProps() {
@@ -78,56 +97,65 @@ public class GroupConfigTest {
// Check for invalid consumerSessionTimeoutMs, < MIN
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid consumerSessionTimeoutMs, > MAX
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "70000");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid consumerHeartbeatIntervalMs, < MIN
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid consumerHeartbeatIntervalMs, > MAX
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareSessionTimeoutMs, < MIN
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareSessionTimeoutMs, > MAX
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareHeartbeatIntervalMs, < MIN
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareHeartbeatIntervalMs, > MAX
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareRecordLockDurationMs, < MIN
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "10000");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid shareRecordLockDurationMs, > MAX
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000");
- doTestInvalidProps(props);
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareAutoOffsetReset
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
+ doTestInvalidProps(props, ConfigException.class);
}
- private void doTestInvalidProps(Properties props) {
- assertThrows(InvalidConfigurationException.class, () ->
GroupConfig.validate(props, createGroupCoordinatorConfig(),
createShareGroupConfig()));
+ private void doTestInvalidProps(Properties props, Class<? extends
Exception> exceptionClassName) {
+ assertThrows(exceptionClassName, () -> GroupConfig.validate(props,
createGroupCoordinatorConfig(), createShareGroupConfig()));
+ }
+
+ private void doTestValidProps(Properties props) {
+ assertDoesNotThrow(() -> GroupConfig.validate(props,
createGroupCoordinatorConfig(), createShareGroupConfig()));
}
@Test
@@ -138,6 +166,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"2000");
+ defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -148,6 +177,7 @@ public class GroupConfigTest {
assertEquals(10,
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
+ assertEquals("latest",
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
}
@Test
@@ -165,6 +195,7 @@ public class GroupConfigTest {
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 2fa0d2162d3..a08f85a8108 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -401,7 +401,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return ReadShareGroupStateResponse.toResponseData(
topicId,
partition,
- PartitionFactory.DEFAULT_START_OFFSET,
+ PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList()
);
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
b/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
index 9d37b114b27..83d3d7d74a8 100644
---
a/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
+++
b/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
@@ -53,7 +53,7 @@ public class NoOpShareStatePersister implements Persister {
for (TopicData<PartitionIdLeaderEpochData> topicData :
reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(),
topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionAllData(
- partitionIdData.partition(),
PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList()))
+ partitionIdData.partition(),
PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE,
Collections.emptyList()))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new
ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
@@ -93,7 +93,7 @@ public class NoOpShareStatePersister implements Persister {
for (TopicData<PartitionIdLeaderEpochData> topicData :
reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(),
topicData.partitions().stream().
map(partitionIdData ->
PartitionFactory.newPartitionStateErrorData(
- partitionIdData.partition(),
PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
+ partitionIdData.partition(),
PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
diff --git
a/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
b/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 7336547e8d6..abd44a854ee 100644
---
a/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++
b/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -26,17 +26,17 @@ import java.util.List;
*/
public class PartitionFactory {
public static final int DEFAULT_STATE_EPOCH = 0;
- public static final int DEFAULT_START_OFFSET = 0;
+ public static final int UNINITIALIZED_START_OFFSET = -1;
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
public static final int DEFAULT_LEADER_EPOCH = 0;
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();
public static PartitionIdData newPartitionIdData(int partition) {
- return new PartitionData(partition, DEFAULT_STATE_EPOCH,
DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE,
DEFAULT_LEADER_EPOCH, null);
+ return new PartitionData(partition, DEFAULT_STATE_EPOCH,
UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE,
DEFAULT_LEADER_EPOCH, null);
}
public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int
partition, int leaderEpoch) {
- return new PartitionData(partition, DEFAULT_STATE_EPOCH,
DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch,
null);
+ return new PartitionData(partition, DEFAULT_STATE_EPOCH,
UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE,
leaderEpoch, null);
}
public static PartitionStateData newPartitionStateData(int partition, int
stateEpoch, long startOffset) {
@@ -44,7 +44,7 @@ public class PartitionFactory {
}
public static PartitionErrorData newPartitionErrorData(int partition,
short errorCode, String errorMessage) {
- return new PartitionData(partition, DEFAULT_STATE_EPOCH,
DEFAULT_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
+ return new PartitionData(partition, DEFAULT_STATE_EPOCH,
UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH,
null);
}
public static PartitionStateErrorData newPartitionStateErrorData(int
partition, int stateEpoch, long startOffset, short errorCode, String
errorMessage) {