This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9db5ed00a83 KAFKA-16726: Added share.auto.offset.reset dynamic config 
for share groups (#17573)
9db5ed00a83 is described below

commit 9db5ed00a8369d5c696e836661230110ea2ea44d
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Nov 11 14:36:11 2024 +0530

    KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups 
(#17573)
    
    This PR adds another dynamic config share.auto.offset.reset fir share 
groups.
    
    
    Reviewers:  Andrew Schofield <[email protected]>, Apoorv Mittal 
<[email protected]>,  Abhinav Dixit <[email protected]>, Manikumar 
Reddy <[email protected]>
---
 .../java/kafka/server/share/ShareFetchUtils.java   |  24 +-
 .../java/kafka/server/share/SharePartition.java    |  31 +-
 .../server/share/SharePartitionManagerTest.java    |  15 +
 .../kafka/server/share/SharePartitionTest.java     | 267 +++++++++++++++-
 .../java/kafka/test/api/ShareConsumerTest.java     | 244 +++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   5 +-
 .../server/ShareFetchAcknowledgeRequestTest.scala  | 354 +++++++++++++--------
 .../kafka/coordinator/group/GroupConfig.java       |  42 ++-
 .../kafka/coordinator/group/GroupConfigTest.java   |  57 +++-
 .../coordinator/share/ShareCoordinatorShard.java   |   2 +-
 .../share/persister/NoOpShareStatePersister.java   |   4 +-
 .../server/share/persister/PartitionFactory.java   |   8 +-
 12 files changed, 889 insertions(+), 164 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index bd504deac49..3515362152b 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -19,9 +19,11 @@ package kafka.server.share;
 import kafka.cluster.Partition;
 import kafka.server.ReplicaManager;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.OffsetNotAvailableException;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.FileRecords;
@@ -40,6 +42,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import scala.Option;
+import scala.Some;
 
 /**
  * Utility class for post-processing of share fetch operations.
@@ -125,7 +128,26 @@ public class ShareFetchUtils {
         Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
                 topicIdPartition.topicPartition(), 
ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
                 Optional.empty(), true).timestampAndOffsetOpt();
-        return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
+        if (timestampAndOffset.isEmpty()) {
+            throw new OffsetNotAvailableException("offset for Earliest 
timestamp not found for topic partition: " + topicIdPartition);
+        }
+        return timestampAndOffset.get().offset;
+    }
+
+    /**
+     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
+     *
+     * @return The offset for the latest timestamp.
+     */
+    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager) {
+        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests
+        Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.empty(), true).timestampAndOffsetOpt();
+        if (timestampAndOffset.isEmpty()) {
+            throw new OffsetNotAvailableException("offset for Latest timestamp 
not found for topic partition: " + topicIdPartition);
+        }
+        return timestampAndOffset.get().offset;
     }
 
     static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 740bf697de4..71baea10174 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -72,6 +73,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
+import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
+
 /**
  * The SharePartition is used to track the state of a partition that is shared 
between multiple
  * consumers. The class maintains the state of the records that have been 
fetched from the leader
@@ -421,8 +425,12 @@ public class SharePartition {
                     return;
                 }
 
-                // Set the state epoch and end offset from the persisted state.
-                startOffset = partitionData.startOffset() != -1 ? 
partitionData.startOffset() : 0;
+                try {
+                    startOffset = 
startOffsetDuringInitialization(partitionData.startOffset());
+                } catch (Exception e) {
+                    completeInitializationWithException(future, e);
+                    return;
+                }
                 stateEpoch = partitionData.stateEpoch();
 
                 List<PersisterStateBatch> stateBatches = 
partitionData.stateBatches();
@@ -448,7 +456,7 @@ public class SharePartition {
                     // and start/end offsets.
                     maybeUpdateCachedStateAndOffsets();
                 } else {
-                    
updateEndOffsetAndResetFetchOffsetMetadata(partitionData.startOffset());
+                    updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
                 }
                 // Set the partition state to Active and complete the future.
                 partitionState = SharePartitionState.ACTIVE;
@@ -2058,6 +2066,23 @@ public class SharePartition {
         }
     }
 
+    private long startOffsetDuringInitialization(long 
partitionDataStartOffset) throws Exception {
+        // Set the state epoch and end offset from the persisted state.
+        if (partitionDataStartOffset != 
PartitionFactory.UNINITIALIZED_START_OFFSET) {
+            return partitionDataStartOffset;
+        }
+        GroupConfig.ShareGroupAutoOffsetReset offsetResetStrategy;
+        if (groupConfigManager.groupConfig(groupId).isPresent()) {
+            offsetResetStrategy = 
groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
+        } else {
+            offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
+        }
+
+        if (offsetResetStrategy == 
GroupConfig.ShareGroupAutoOffsetReset.EARLIEST)
+            return offsetForEarliestTimestamp(topicIdPartition, 
replicaManager);
+        return offsetForLatestTimestamp(topicIdPartition, replicaManager);
+    }
+
     // Visible for testing. Should only be used for testing purposes.
     NavigableMap<Long, InFlightBatch> cachedState() {
         return new ConcurrentSkipListMap<>(cachedState);
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 67c2a6cce77..46abf04b0a6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -17,6 +17,7 @@
 package kafka.server.share;
 
 import kafka.cluster.Partition;
+import kafka.log.OffsetResultHolder;
 import kafka.server.LogReadResult;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicaQuota;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.ShareFetchRequest;
@@ -1040,6 +1042,8 @@ public class SharePartitionManagerTest {
         partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES);
         partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
 
+        mockFetchOffsetForTimestamp(mockReplicaManager);
+
         Time time = mock(Time.class);
         when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
         Metrics metrics = new Metrics();
@@ -1109,6 +1113,9 @@ public class SharePartitionManagerTest {
         partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
 
         final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+
+        mockFetchOffsetForTimestamp(mockReplicaManager);
+
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
             DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@@ -1233,6 +1240,8 @@ public class SharePartitionManagerTest {
         TopicIdPartition tp0 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 0));
         Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
 
+        mockFetchOffsetForTimestamp(mockReplicaManager);
+
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
             DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
@@ -2482,6 +2491,12 @@ public class SharePartitionManagerTest {
         });
     }
 
+    private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+    }
+
     static Seq<Tuple2<TopicIdPartition, LogReadResult>> 
buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
         List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new 
ArrayList<>();
         topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new 
Tuple2<>(topicIdPartition, new LogReadResult(
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 6538a1f4012..3e90005902e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -16,12 +16,14 @@
  */
 package kafka.server.share;
 
+import kafka.log.OffsetResultHolder;
 import kafka.server.ReplicaManager;
 import kafka.server.share.SharePartition.InFlightState;
 import kafka.server.share.SharePartition.RecordState;
 import kafka.server.share.SharePartition.SharePartitionState;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
@@ -34,9 +36,11 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfig;
@@ -77,6 +81,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -191,6 +197,244 @@ public class SharePartitionTest {
         assertNull(sharePartition.cachedState().get(11L).offsetState());
     }
 
+    @Test
+    public void 
testMaybeInitializeDefaultStartEpochGroupConfigReturnsEarliest() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(
+                    0, PartitionFactory.DEFAULT_STATE_EPOCH,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_ERROR_CODE,
+                    PartitionFactory.DEFAULT_ERR_MESSAGE,
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
+
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManager)
+            .withReplicaManager(replicaManager)
+            .build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        // replicaManager.fetchOffsetForTimestamp should be called with 
"ListOffsetsRequest.EARLIEST_TIMESTAMP"
+        Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+            Mockito.any(TopicPartition.class),
+            Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean()
+        );
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(0, sharePartition.startOffset());
+        assertEquals(0, sharePartition.endOffset());
+        assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, 
sharePartition.stateEpoch());
+    }
+
+    @Test
+    public void testMaybeInitializeDefaultStartEpochGroupConfigReturnsLatest() 
{
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(
+                    0, PartitionFactory.DEFAULT_STATE_EPOCH,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_ERROR_CODE,
+                    PartitionFactory.DEFAULT_ERR_MESSAGE,
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.LATEST);
+
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManager)
+            .withReplicaManager(replicaManager)
+            .build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        // replicaManager.fetchOffsetForTimestamp should be called with 
"ListOffsetsRequest.LATEST_TIMESTAMP"
+        Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+            Mockito.any(TopicPartition.class),
+            Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean()
+        );
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(15, sharePartition.startOffset());
+        assertEquals(15, sharePartition.endOffset());
+        assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, 
sharePartition.stateEpoch());
+    }
+
+    @Test
+    public void testMaybeInitializeDefaultStartEpochGroupConfigNotPresent() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(
+                    0, PartitionFactory.DEFAULT_STATE_EPOCH,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_ERROR_CODE,
+                    PartitionFactory.DEFAULT_ERR_MESSAGE,
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManager)
+            .withReplicaManager(replicaManager)
+            .build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        // replicaManager.fetchOffsetForTimestamp should be called with 
"ListOffsetsRequest.LATEST_TIMESTAMP"
+        Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+            Mockito.any(TopicPartition.class),
+            Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean()
+        );
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(15, sharePartition.startOffset());
+        assertEquals(15, sharePartition.endOffset());
+        assertEquals(PartitionFactory.DEFAULT_STATE_EPOCH, 
sharePartition.stateEpoch());
+    }
+
+    @Test
+    public void testMaybeInitializeFetchOffsetForLatestTimestampThrowsError() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(
+                    0, PartitionFactory.DEFAULT_STATE_EPOCH,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_ERROR_CODE,
+                    PartitionFactory.DEFAULT_ERR_MESSAGE,
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
 Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
+            .thenThrow(new RuntimeException("fetch offsets exception"));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManager)
+            .withReplicaManager(replicaManager)
+            .build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+
+        // replicaManager.fetchOffsetForTimestamp should be called with 
"ListOffsetsRequest.LATEST_TIMESTAMP"
+        Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+            Mockito.any(TopicPartition.class),
+            Mockito.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean()
+        );
+
+        assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
+    }
+
+    @Test
+    public void 
testMaybeInitializeFetchOffsetForEarliestTimestampThrowsError() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(
+                    0, PartitionFactory.DEFAULT_STATE_EPOCH,
+                    PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_ERROR_CODE,
+                    PartitionFactory.DEFAULT_ERR_MESSAGE,
+                    Collections.emptyList())))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(GroupConfig.ShareGroupAutoOffsetReset.EARLIEST);
+
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        
Mockito.when(replicaManager.fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
 Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()))
+            .thenThrow(new RuntimeException("fetch offsets exception"));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withGroupConfigManager(groupConfigManager)
+            .withReplicaManager(replicaManager)
+            .build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+
+        // replicaManager.fetchOffsetForTimestamp should be called with 
"ListOffsetsRequest.EARLIEST_TIMESTAMP"
+        Mockito.verify(replicaManager).fetchOffsetForTimestamp(
+            Mockito.any(TopicPartition.class),
+            Mockito.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean()
+        );
+
+        assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
+    }
+
     @Test
     public void testMaybeInitializeSharePartitionAgain() {
         Persister persister = Mockito.mock(Persister.class);
@@ -460,7 +704,13 @@ public class SharePartitionTest {
 
     @Test
     public void testMaybeInitializeWithNoOpShareStatePersister() {
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
         assertTrue(result.isDone());
         assertFalse(result.isCompletedExceptionally());
@@ -825,7 +1075,13 @@ public class SharePartitionTest {
 
     @Test
     public void testMaybeAcquireAndReleaseFetchLock() {
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+
+        FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
+        Mockito.doReturn(new 
OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).
+            
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
         sharePartition.maybeInitialize();
         assertTrue(sharePartition.maybeAcquireFetchLock());
         // Lock cannot be acquired again, as already acquired.
@@ -5233,7 +5489,7 @@ public class SharePartitionTest {
         private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
 
         private Persister persister = new NoOpShareStatePersister();
-        private final ReplicaManager replicaManager = 
Mockito.mock(ReplicaManager.class);
+        private ReplicaManager replicaManager = 
Mockito.mock(ReplicaManager.class);
         private GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         private SharePartitionState state = SharePartitionState.EMPTY;
 
@@ -5257,6 +5513,11 @@ public class SharePartitionTest {
             return this;
         }
 
+        private SharePartitionBuilder withReplicaManager(ReplicaManager 
replicaManager) {
+            this.replicaManager = replicaManager;
+            return this;
+        }
+
         private SharePartitionBuilder 
withGroupConfigManager(GroupConfigManager groupConfigManager) {
             this.groupConfigManager = groupConfigManager;
             return this;
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index ade43e899b4..b7d127eb429 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -19,6 +19,9 @@ package kafka.test.api;
 import kafka.api.BaseConsumerTest;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.AcknowledgeType;
@@ -34,6 +37,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidTopicException;
@@ -47,6 +51,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.test.KafkaClusterTestKit;
 import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -60,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -101,6 +107,8 @@ public class ShareConsumerTest {
     private static final String DEFAULT_STATE_PERSISTER = 
"org.apache.kafka.server.share.persister.DefaultStatePersister";
     private static final String NO_OP_PERSISTER = 
"org.apache.kafka.server.share.persister.NoOpShareStatePersister";
 
+    private Admin adminClient;
+
     @BeforeEach
     public void createCluster(TestInfo testInfo) throws Exception {
         String persisterClassName = NO_OP_PERSISTER;
@@ -131,11 +139,13 @@ public class ShareConsumerTest {
         cluster.waitForReadyBrokers();
         createTopic("topic");
         createTopic("topic2");
+        adminClient = createAdminClient();
         warmup();
     }
 
     @AfterEach
     public void destroyCluster() throws Exception {
+        adminClient.close();
         cluster.close();
     }
 
@@ -156,6 +166,7 @@ public class ShareConsumerTest {
         Set<String> subscription = Collections.singleton(tp.topic());
         shareConsumer.subscribe(subscription);
         assertEquals(subscription, shareConsumer.subscription());
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
         shareConsumer.close();
         assertEquals(0, records.count());
@@ -168,6 +179,7 @@ public class ShareConsumerTest {
         Set<String> subscription = Collections.singleton(tp.topic());
         shareConsumer.subscribe(subscription);
         assertEquals(subscription, shareConsumer.subscription());
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
         shareConsumer.unsubscribe();
         assertEquals(Collections.emptySet(), shareConsumer.subscription());
@@ -182,6 +194,7 @@ public class ShareConsumerTest {
         Set<String> subscription = Collections.singleton(tp.topic());
         shareConsumer.subscribe(subscription);
         assertEquals(subscription, shareConsumer.subscription());
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
         assertEquals(0, records.count());
         shareConsumer.subscribe(subscription);
@@ -198,6 +211,7 @@ public class ShareConsumerTest {
         Set<String> subscription = Collections.singleton(tp.topic());
         shareConsumer.subscribe(subscription);
         assertEquals(subscription, shareConsumer.subscription());
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
         shareConsumer.unsubscribe();
         assertEquals(Collections.emptySet(), shareConsumer.subscription());
@@ -214,6 +228,7 @@ public class ShareConsumerTest {
         Set<String> subscription = Collections.singleton(tp.topic());
         shareConsumer.subscribe(subscription);
         assertEquals(subscription, shareConsumer.subscription());
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
         shareConsumer.subscribe(Collections.emptySet());
         assertEquals(Collections.emptySet(), shareConsumer.subscription());
@@ -231,6 +246,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         shareConsumer.close();
@@ -245,6 +261,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         producer.send(record);
@@ -273,6 +290,8 @@ public class ShareConsumerTest {
 
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
+        alterShareAutoOffsetReset("group1", "earliest");
+
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
 
@@ -307,6 +326,8 @@ public class ShareConsumerTest {
         shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
+        alterShareAutoOffsetReset("group1", "earliest");
+
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         // Now in the second poll, we implicitly acknowledge the record 
received in the first poll.
@@ -334,6 +355,8 @@ public class ShareConsumerTest {
         shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
+        alterShareAutoOffsetReset("group1", "earliest");
+
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
 
@@ -361,6 +384,8 @@ public class ShareConsumerTest {
         shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
+        alterShareAutoOffsetReset("group1", "earliest");
+
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
 
@@ -420,6 +445,7 @@ public class ShareConsumerTest {
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
         assertEquals(numRecords, records.size());
@@ -442,6 +468,7 @@ public class ShareConsumerTest {
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
         assertEquals(numRecords, records.size());
@@ -468,6 +495,8 @@ public class ShareConsumerTest {
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
             "group1", 
Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)));
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
+
         List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
         long i = 0L;
         for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -513,6 +542,8 @@ public class ShareConsumerTest {
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
+
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(4, records.count());
         assertEquals(transactional1.offset(), 
records.records(tp).get(0).offset());
@@ -538,6 +569,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         records.forEach(shareConsumer::acknowledge);
@@ -556,6 +588,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         records.forEach(shareConsumer::acknowledge);
@@ -583,6 +616,7 @@ public class ShareConsumerTest {
         KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer1.subscribe(Collections.singleton(tp.topic()));
         shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
         Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
@@ -634,6 +668,7 @@ public class ShareConsumerTest {
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         Map<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<>();
         Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
@@ -689,6 +724,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -709,6 +745,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -727,6 +764,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -734,7 +772,6 @@ public class ShareConsumerTest {
         producer.close();
     }
 
-
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
     public void testExplicitAcknowledgeThrowsNotInBatch(String persister) {
@@ -743,6 +780,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
@@ -762,6 +800,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
@@ -780,6 +819,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
@@ -805,6 +845,7 @@ public class ShareConsumerTest {
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<>();
         Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
@@ -843,6 +884,8 @@ public class ShareConsumerTest {
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
             "group1", 
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
String.valueOf(maxPartitionFetchBytes)));
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
+
         ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
         assertEquals(1, records.count());
         shareConsumer.close();
@@ -857,9 +900,11 @@ public class ShareConsumerTest {
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group2");
         shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group2", "earliest");
 
         // producing 3 records to the topic
         producer.send(record);
@@ -907,6 +952,7 @@ public class ShareConsumerTest {
         shareConsumer1.subscribe(Collections.singleton(tp.topic()));
         KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         int totalMessages = 2000;
         for (int i = 0; i < totalMessages; i++) {
@@ -943,9 +989,16 @@ public class ShareConsumerTest {
         int producerCount = 4;
         int messagesPerProducer = 5000;
 
+        String groupId = "group1";
+
         ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
         ExecutorService consumerExecutorService = 
Executors.newFixedThreadPool(consumerCount);
 
+        // This consumer is created to register the share group id with the 
groupCoordinator
+        // so that the config share.auto.offset.reset can be altered for this 
group
+        createShareConsumer(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), groupId);
+        alterShareAutoOffsetReset(groupId, "earliest");
+
         for (int i = 0; i < producerCount; i++) {
             producerExecutorService.submit(() -> 
produceMessages(messagesPerProducer));
         }
@@ -957,7 +1010,7 @@ public class ShareConsumerTest {
             consumerExecutorService.submit(() -> {
                 CompletableFuture<Integer> future = new CompletableFuture<>();
                 futures.add(future);
-                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, "group1", consumerNumber, 30, true, future, 
Optional.of(maxBytes));
+                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, groupId, consumerNumber, 30, true, future, 
Optional.of(maxBytes));
             });
         }
 
@@ -990,6 +1043,19 @@ public class ShareConsumerTest {
         int messagesPerProducer = 2000;
         final int totalMessagesSent = producerCount * messagesPerProducer;
 
+        String groupId1 = "group1";
+        String groupId2 = "group2";
+        String groupId3 = "group3";
+
+        // These consumers are created to register the share group ids with 
the groupCoordinator
+        // so that the config share.auto.offset.reset can be altered for these 
groups
+        createShareConsumer(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), groupId1);
+        alterShareAutoOffsetReset(groupId1, "earliest");
+        createShareConsumer(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), groupId2);
+        alterShareAutoOffsetReset(groupId2, "earliest");
+        createShareConsumer(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), groupId3);
+        alterShareAutoOffsetReset(groupId3, "earliest");
+
         ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
         ExecutorService shareGroupExecutorService1 = 
Executors.newFixedThreadPool(consumerCount);
         ExecutorService shareGroupExecutorService2 = 
Executors.newFixedThreadPool(consumerCount);
@@ -1095,6 +1161,7 @@ public class ShareConsumerTest {
         shareConsumer1.subscribe(Collections.singleton(tp.topic()));
         KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         int totalMessages = 1500;
         for (int i = 0; i < totalMessages; i++) {
@@ -1139,6 +1206,13 @@ public class ShareConsumerTest {
         int producerCount = 4;
         int messagesPerProducer = 5000;
 
+        String groupId = "group1";
+
+        // This consumer is created to register the share group id with the 
groupCoordinator
+        // so that the config share.auto.offset.reset can be altered for this 
group
+        createShareConsumer(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), groupId);
+        alterShareAutoOffsetReset(groupId, "earliest");
+
         ExecutorService consumerExecutorService = 
Executors.newFixedThreadPool(consumerCount);
         ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
 
@@ -1157,7 +1231,7 @@ public class ShareConsumerTest {
             // The "failing" consumer polls but immediately closes, which 
releases the records for the other consumers
             CompletableFuture<Integer> future = new CompletableFuture<>();
             AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
-            consumeMessages(failedMessagesConsumed, producerCount * 
messagesPerProducer, "group1", 0, 1, false, future);
+            consumeMessages(failedMessagesConsumed, producerCount * 
messagesPerProducer, groupId, 0, 1, false, future);
             startSignal.countDown();
         });
 
@@ -1174,7 +1248,7 @@ public class ShareConsumerTest {
             consumerExecutorService.submit(() -> {
                 CompletableFuture<Integer> future = new CompletableFuture<>();
                 futuresSuccess.add(future);
-                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, "group1", consumerNumber, 40, true, future, 
Optional.of(maxBytes));
+                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, groupId, consumerNumber, 40, true, future, 
Optional.of(maxBytes));
             });
         }
         producerExecutorService.shutdown();
@@ -1203,6 +1277,7 @@ public class ShareConsumerTest {
         KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
         KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         producer.send(producerRecord1);
 
@@ -1251,6 +1326,7 @@ public class ShareConsumerTest {
         KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        alterShareAutoOffsetReset("group1", "earliest");
 
         shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1292,6 +1368,7 @@ public class ShareConsumerTest {
         KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        alterShareAutoOffsetReset("group1", "earliest");
 
         // The acknowledgment commit callback will try to call a method of 
KafkaShareConsumer
         shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
@@ -1331,6 +1408,7 @@ public class ShareConsumerTest {
         KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        alterShareAutoOffsetReset("group1", "earliest");
 
         shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgeCommitCallbackThrows<>());
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -1363,6 +1441,7 @@ public class ShareConsumerTest {
     public void testPollThrowsInterruptExceptionIfInterrupted(String 
persister) {
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         // interrupt the thread and call poll
         try {
@@ -1386,6 +1465,7 @@ public class ShareConsumerTest {
     public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String 
persister) {
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton("topic abc"));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         // The exception depends upon a metadata response which arrives 
asynchronously. If the delay is
         // too short, the poll might return before the error is known.
@@ -1405,6 +1485,7 @@ public class ShareConsumerTest {
         producer.send(record);
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         shareConsumer.wakeup();
         assertThrows(WakeupException.class, () -> 
shareConsumer.poll(Duration.ZERO));
@@ -1423,6 +1504,7 @@ public class ShareConsumerTest {
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         String topic = "foo";
         shareConsumer.subscribe(Collections.singleton(topic));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         // Topic is created post creation of share consumer and subscription
         createTopic(topic);
@@ -1458,6 +1540,7 @@ public class ShareConsumerTest {
         KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
         // Consumer subscribes to the topics -> bar and baz.
         shareConsumer.subscribe(Arrays.asList(topic1, topic2));
+        alterShareAutoOffsetReset("group1", "earliest");
 
         producer.send(recordTopic1).get();
         TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -1490,6 +1573,13 @@ public class ShareConsumerTest {
         KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
 
+        String groupId = "group1";
+
+        // This consumer is created to register the share group id with the 
groupCoordinator
+        // so that the config share.auto.offset.reset can be altered for this 
group
+        createShareConsumer(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), groupId);
+        alterShareAutoOffsetReset(groupId, "earliest");
+
         // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
         try {
             for (int i = 0; i < 10; i++) {
@@ -1499,13 +1589,12 @@ public class ShareConsumerTest {
             fail("Failed to send records: " + e);
         }
 
-        Admin adminClient = createAdminClient();
         // We delete records before offset 5, so the LSO should move to 5.
         adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
 
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
         CompletableFuture<Integer> future = new CompletableFuture<>();
-        consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, 
future);
+        consumeMessages(totalMessagesConsumed, 5, groupId, 1, 10, true, 
future);
         // The records returned belong to offsets 5-9.
         assertEquals(5, totalMessagesConsumed.get());
         try {
@@ -1528,7 +1617,7 @@ public class ShareConsumerTest {
 
         totalMessagesConsumed = new AtomicInteger(0);
         future = new CompletableFuture<>();
-        consumeMessages(totalMessagesConsumed, 1, "group1", 1, 10, true, 
future);
+        consumeMessages(totalMessagesConsumed, 1, groupId, 1, 10, true, 
future);
         // The record returned belong to offset 14.
         assertEquals(1, totalMessagesConsumed.get());
         try {
@@ -1542,14 +1631,135 @@ public class ShareConsumerTest {
 
         totalMessagesConsumed = new AtomicInteger(0);
         future = new CompletableFuture<>();
-        consumeMessages(totalMessagesConsumed, 0, "group1", 1, 5, true, 
future);
+        consumeMessages(totalMessagesConsumed, 0, groupId, 1, 5, true, future);
         assertEquals(0, totalMessagesConsumed.get());
         try {
             assertEquals(0, future.get());
         } catch (Exception e) {
             fail("Exception occurred : " + e.getMessage());
         }
-        adminClient.close();
+        producer.close();
+    }
+
+    @ParameterizedTest(name = "{displayName}.persister={0}")
+    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+    public void testShareAutoOffsetResetDefaultValue(String persister) {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // Producing a record.
+        producer.send(record);
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        // No records should be consumed because share.auto.offset.reset has a 
default of "latest". Since the record
+        // was produced before share partition was initialized (which happens 
after the first share fetch request
+        // in the poll method), the start offset would be the latest offset, 
i.e. 1 (the next offset after the already
+        // present 0th record)
+        assertEquals(0, records.count());
+        // Producing another record.
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        // Now the next record should be consumed successfully
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @ParameterizedTest(name = "{displayName}.persister={0}")
+    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+    public void testShareAutoOffsetResetEarliest(String persister) {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        // Changing the value of share.auto.offset.reset value to "earliest"
+        alterShareAutoOffsetReset("group1", "earliest");
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // Producing a record.
+        producer.send(record);
+        ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+        // Since the value for share.auto.offset.reset has been altered to 
"earliest", the consumer should consume
+        // all messages present on the partition
+        assertEquals(1, records.count());
+        // Producing another record.
+        producer.send(record);
+        records = shareConsumer.poll(Duration.ofMillis(5000));
+        // The next records should also be consumed successfully
+        assertEquals(1, records.count());
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @ParameterizedTest(name = "{displayName}.persister={0}")
+    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+    public void testShareAutoOffsetResetEarliestAfterLsoMovement(String 
persister) throws Exception {
+        KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumer.subscribe(Collections.singleton(tp.topic()));
+        // Changing the value of share.auto.offset.reset value to "earliest"
+        alterShareAutoOffsetReset("group1", "earliest");
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
+        try {
+            for (int i = 0; i < 10; i++) {
+                producer.send(record).get();
+            }
+        } catch (Exception e) {
+            fail("Failed to send records: " + e);
+        }
+
+        // We delete records before offset 5, so the LSO should move to 5.
+        adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
+
+        AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        consumeMessages(totalMessagesConsumed, 5, "group1", 1, 10, true, 
future);
+        // The records returned belong to offsets 5-9.
+        assertEquals(5, totalMessagesConsumed.get());
+        assertEquals(5, future.get());
+
+        shareConsumer.close();
+        producer.close();
+    }
+
+    @ParameterizedTest(name = "{displayName}.persister={0}")
+    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+    public void 
testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
+        KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
+        shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
+        // Changing the value of share.auto.offset.reset value to "earliest" 
for group1
+        alterShareAutoOffsetReset("group1", "earliest");
+
+        KafkaShareConsumer<byte[], byte[]> shareConsumerLatest = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group2");
+        shareConsumerLatest.subscribe(Collections.singleton(tp.topic()));
+        // Changing the value of share.auto.offset.reset value to "latest" for 
group2
+        alterShareAutoOffsetReset("group2", "latest");
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+        KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+        // Producing a record.
+        producer.send(record);
+        ConsumerRecords<byte[], byte[]> records1 = 
shareConsumerEarliest.poll(Duration.ofMillis(5000));
+        // Since the value for share.auto.offset.reset has been altered to 
"earliest", the consumer should consume
+        // all messages present on the partition
+        assertEquals(1, records1.count());
+
+        ConsumerRecords<byte[], byte[]> records2 = 
shareConsumerLatest.poll(Duration.ofMillis(5000));
+        // Since the value for share.auto.offset.reset has been altered to 
"latest", the consumer should not consume
+        // any message
+        assertEquals(0, records2.count());
+
+        // Producing another record.
+        producer.send(record);
+
+        records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
+        // The next record should also be consumed successfully by group1
+        assertEquals(1, records1.count());
+
+        records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
+        // The next record should also be consumed successfully by group2
+        assertEquals(1, records2.count());
+
+        shareConsumerEarliest.close();
+        shareConsumerLatest.close();
         producer.close();
     }
 
@@ -1729,6 +1939,7 @@ public class ShareConsumerTest {
         try {
             producer.send(record).get(15000, TimeUnit.MILLISECONDS);
             shareConsumer.subscribe(subscription);
+            alterShareAutoOffsetReset("warmupgroup1", "earliest");
             TestUtils.waitForCondition(
                 () -> shareConsumer.poll(Duration.ofMillis(5000)).count() == 
1, 30000, 200L, () -> "warmup record not received");
         } finally {
@@ -1736,4 +1947,19 @@ public class ShareConsumerTest {
             shareConsumer.close();
         }
     }
+
+    private void alterShareAutoOffsetReset(String groupId, String newValue) {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+        Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
+        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+            GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), 
AlterConfigOp.OpType.SET)));
+        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+        try {
+            adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+                .all()
+                .get(60, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            fail("Exception was thrown: ", e);
+        }
+    }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7b78793373b..2e6ffdf400b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -74,9 +74,9 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG}
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
-import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorConfigTest}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.LeaderAndIsr
@@ -585,6 +585,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
+    cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.defaultShareAutoOffsetReset.toString)
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
 
     val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index cdc8cf5dd75..8097021e4cb 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -170,14 +170,17 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -235,16 +238,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicIdPartition2 = new TopicIdPartition(topicId, new 
TopicPartition(topic, 1))
     val topicIdPartition3 = new TopicIdPartition(topicId, new 
TopicPartition(topic, 2))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, 
topicIdPartition2, topicIdPartition3)
+
+    // Send the first share fetch request to initialize the share partitions
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic partitions created above
     produceData(topicIdPartition1, 10)
     produceData(topicIdPartition2, 10)
     produceData(topicIdPartition3, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, 
topicIdPartition2, topicIdPartition3)
-
-    // Send the share fetch request to fetch the records produced above
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -325,12 +331,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val leader2 = partitionToLeaders(topicIdPartition2)
     val leader3 = partitionToLeaders(topicIdPartition3)
 
-    initProducer()
-    // Producing 10 records to the topic partitions created above
-    produceData(topicIdPartition1, 10)
-    produceData(topicIdPartition2, 10)
-    produceData(topicIdPartition3, 10)
-
     val send1: Seq[TopicIdPartition] = Seq(topicIdPartition1)
     val send2: Seq[TopicIdPartition] = Seq(topicIdPartition2)
     val send3: Seq[TopicIdPartition] = Seq(topicIdPartition3)
@@ -338,14 +338,31 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
 
-    // Crete different share fetch requests for different partitions as they 
may have leaders on separate brokers
-    val shareFetchRequest1 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
-    val shareFetchRequest2 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
-    val shareFetchRequest3 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
+    // Send the first share fetch request to initialize the share partitions
+    // Create different share fetch requests for different partitions as they 
may have leaders on separate brokers
+    var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
+    var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
+    var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
+
+    var shareFetchResponse1 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
+    var shareFetchResponse2 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
+    var shareFetchResponse3 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
+
+    initProducer()
+    // Producing 10 records to the topic partitions created above
+    produceData(topicIdPartition1, 10)
+    produceData(topicIdPartition2, 10)
+    produceData(topicIdPartition3, 10)
+
+    // Send the second share fetch request to fetch the records produced above
+    // Create different share fetch requests for different partitions as they 
may have leaders on separate brokers
+    shareFetchRequest1 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send1, Seq.empty, acknowledgementsMap)
+    shareFetchRequest2 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send2, Seq.empty, acknowledgementsMap)
+    shareFetchRequest3 = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send3, Seq.empty, acknowledgementsMap)
 
-    val shareFetchResponse1 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
-    val shareFetchResponse2 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
-    val shareFetchResponse3 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
+    shareFetchResponse1 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest1, destination = leader1)
+    shareFetchResponse2 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest2, destination = leader2)
+    shareFetchResponse3 = 
connectAndReceive[ShareFetchResponse](shareFetchRequest3, destination = leader3)
 
     val shareFetchResponseData1 = shareFetchResponse1.data()
     assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
@@ -427,15 +444,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize share partitions
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -482,7 +502,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic
     produceData(topicIdPartition, 10)
 
-    // Sending a second share fetch request to check if acknowledgements were 
done successfully
+    // Sending a third share fetch request to check if acknowledgements were 
done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -540,15 +560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch: Int = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -571,7 +594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic created above
     produceData(topicIdPartition, 10)
 
-    // Send a Share Fetch request with piggybacked acknowledgements
+    // Send the third Share Fetch request with piggybacked acknowledgements
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
@@ -599,7 +622,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic
     produceData(topicIdPartition, 10)
 
-    // Sending a third share fetch request to confirm if acknowledgements were 
done successfully
+    // Sending a fourth share fetch request to confirm if acknowledgements 
were done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -657,15 +680,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partiion
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -709,7 +735,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val acknowledgePartitionData = 
shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
     compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, 
acknowledgePartitionData)
 
-    // Sending a second share fetch request to check if acknowledgements were 
done successfully
+    // Sending a third share fetch request to check if acknowledgements were 
done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -767,15 +793,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -798,7 +827,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic created above
     produceData(topicIdPartition, 10)
 
-    // Send a Share Fetch request with piggybacked acknowledgements
+    // Send a third Share Fetch request with piggybacked acknowledgements
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
@@ -862,15 +891,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -917,7 +949,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic
     produceData(topicIdPartition, 10)
 
-    // Sending a second share fetch request to check if acknowledgements were 
done successfully
+    // Sending a third share fetch request to check if acknowledgements were 
done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -975,15 +1007,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1006,7 +1041,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic created above
     produceData(topicIdPartition, 10)
 
-    // Send a Share Fetch request with piggybacked acknowledgements
+    // Send a third Share Fetch request with piggybacked acknowledgements
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
@@ -1034,7 +1069,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic
     produceData(topicIdPartition, 10)
 
-    // Sending a third share fetch request to confirm if acknowledgements were 
done successfully
+    // Sending a fourth share fetch request to confirm if acknowledgements 
were done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -1094,15 +1129,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the shar partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1146,7 +1184,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     var acknowledgePartitionData = 
shareAcknowledgeResponseData.responses().get(0).partitions().get(0)
     compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, 
acknowledgePartitionData)
 
-    // Sending a second share fetch request to check if acknowledgements were 
done successfully
+    // Sending a third share fetch request to check if acknowledgements were 
done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -1193,7 +1231,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 new records to the topic
     produceData(topicIdPartition, 10)
 
-    // Sending a third share fetch request to check if acknowledgements were 
done successfully
+    // Sending a fourth share fetch request to check if acknowledgements were 
done successfully
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
@@ -1251,6 +1289,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 3 large messages to the topic created above
     produceData(topicIdPartition, 10)
@@ -1258,10 +1301,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     produceData(topicIdPartition, "large message 2", new String(new 
Array[Byte](MAX_PARTITION_BYTES/3)))
     produceData(topicIdPartition, "large message 3", new String(new 
Array[Byte](MAX_PARTITION_BYTES/3)))
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    val metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1311,6 +1352,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
   )
   def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = 
{
     val groupId: String = "group"
+
+    val memberId = Uuid.randomUuid()
     val memberId1 = Uuid.randomUuid()
     val memberId2 = Uuid.randomUuid()
     val memberId3 = Uuid.randomUuid()
@@ -1323,12 +1366,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Sending a dummy share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10000 records to the topic created above
     produceData(topicIdPartition, 10000)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
     // Sending 3 share Fetch Requests with same groupId to the same 
topicPartition but with different memberIds,
     // mocking the behaviour of multiple share consumers from the same share 
group
     val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, 
ShareRequestMetadata.INITIAL_EPOCH)
@@ -1418,23 +1464,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Sending 3 dummy share Fetch Requests with to inititlaize the share 
partitions for each share group\
+    sendFirstShareFetchRequest(memberId1, groupId1, send)
+    sendFirstShareFetchRequest(memberId2, groupId2, send)
+    sendFirstShareFetchRequest(memberId3, groupId3, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Sending 3 share Fetch Requests with same groupId to the same 
topicPartition but with different memberIds,
-    // mocking the behaviour of multiple share consumers from the same share 
group
-    val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Sending 3 share Fetch Requests with different groupId and different 
memberIds to the same topicPartition,
+    // mocking the behaviour of 3 different share groups
+    val metadata1 = new ShareRequestMetadata(memberId1, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap1: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest1 = createShareFetchRequest(groupId1, metadata1, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap1)
 
-    val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, 
ShareRequestMetadata.INITIAL_EPOCH)
+    val metadata2 = new ShareRequestMetadata(memberId2, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap2: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest2 = createShareFetchRequest(groupId2, metadata2, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap2)
 
-    val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, 
ShareRequestMetadata.INITIAL_EPOCH)
+    val metadata3 = new ShareRequestMetadata(memberId3, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
     val acknowledgementsMap3: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap3)
 
@@ -1509,15 +1560,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1540,7 +1594,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic created above
     produceData(topicIdPartition, 10)
 
-    // Send a Share Fetch request with piggybacked acknowledgements
+    // Send a third Share Fetch request with piggybacked acknowledgements
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
@@ -1616,15 +1670,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var shareSessionEpoch = ShareRequestMetadata.INITIAL_EPOCH
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
shareSessionEpoch)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var acknowledgementsMapForFetch: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -1647,7 +1704,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     // Producing 10 more records to the topic created above
     produceData(topicIdPartition, 10)
 
-    // Send a Share Fetch request with piggybacked acknowledgements
+    // Send a third Share Fetch request with piggybacked acknowledgements
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
@@ -1804,12 +1861,28 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
shareAcknowledgeResponseData.errorCode)
   }
 
-  @ClusterTest(
-    serverProperties = Array(
-      new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,share"),
-      new ClusterConfigProperty(key = "group.share.enable", value = "true"),
-      new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
-      new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true")
+        )
+      ),
     )
   )
   def testShareFetchRequestInvalidShareSessionEpoch(): Unit = {
@@ -1824,14 +1897,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
@@ -1850,8 +1927,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val partitionData = 
shareFetchResponseData.responses().get(0).partitions().get(0)
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
 
-    // Sending Share Fetch request with invalid share session epoch
-    metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
+    // Sending a thord Share Fetch request with invalid share session epoch
+    shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
+    metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
@@ -1895,14 +1973,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
@@ -1922,7 +2004,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
 
     // Sending Share Acknowledge request with invalid share session epoch
-    metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))
+    shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
+    metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
       Map(topicIdPartition -> List(new 
ShareAcknowledgeRequestData.AcknowledgementBatch()
       .setFirstOffset(0)
@@ -1972,14 +2055,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
@@ -1998,8 +2085,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val partitionData = 
shareFetchResponseData.responses().get(0).partitions().get(0)
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
 
-    // Sending a Share Fetch request with wrong member Id
-    metadata = new ShareRequestMetadata(wrongMemberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    // Sending a third Share Fetch request with wrong member Id
+    shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+    metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
@@ -2044,14 +2132,18 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicId = topicIds.get(topic)
     val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic created above
     produceData(topicIdPartition, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
-
-    // Send the share fetch request to fetch the records produced above
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
     val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
 
@@ -2071,7 +2163,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
 
     // Sending a Share Acknowledge request with wrong member Id
-    metadata = new ShareRequestMetadata(wrongMemberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+    metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
       Map(topicIdPartition -> List(new 
ShareAcknowledgeRequestData.AcknowledgementBatch()
         .setFirstOffset(0)
@@ -2122,15 +2215,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     val topicIdPartition1 = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition1))
     val topicIdPartition2 = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition2))
 
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
+
+    // Send the first share fetch request to initialize the share partition
+    sendFirstShareFetchRequest(memberId, groupId, send)
+
     initProducer()
     // Producing 10 records to the topic partitions created above
     produceData(topicIdPartition1, 10)
     produceData(topicIdPartition2, 10)
 
-    val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
-
-    // Send the share fetch request to fetch the records produced above
-    var metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    // Send the second share fetch request to fetch the records produced above
+    var shareSessionEpoch = 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
+    var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val acknowledgementsMap: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     var shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap)
     var shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -2145,8 +2242,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     produceData(topicIdPartition1, 10)
     produceData(topicIdPartition2, 10)
 
-    // Send the share fetch request to with forget list populated with 
topicIdPartition2
-    metadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
+    // Send another share fetch request with forget list populated with 
topicIdPartition2
+    shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+    metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
     val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap)
     shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
@@ -2167,6 +2265,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     compareFetchResponsePartitions(expectedPartitionData, partitionData)
   }
 
+  private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, 
topicIdPartitions: Seq[TopicIdPartition]): Unit = {
+    val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH)
+    val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty)
+    connectAndReceive[ShareFetchResponse](shareFetchRequest)
+  }
+
   private def expectedAcquiredRecords(firstOffsets: util.List[Long], 
lastOffsets: util.List[Long], deliveryCounts: util.List[Int]): 
util.List[AcquiredRecords] = {
     val acquiredRecordsList: util.List[AcquiredRecords] = new util.ArrayList()
     for (i <- firstOffsets.indices) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index b65e297bb04..934055d9d5b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -21,8 +21,10 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -31,6 +33,8 @@ import java.util.Set;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * Group configuration related parameters and supporting methods like 
validation, etc. are
@@ -48,6 +52,10 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = 
"share.record.lock.duration.ms";
 
+    public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = 
"share.auto.offset.reset";
+    public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = 
ShareGroupAutoOffsetReset.LATEST.toString();
+    public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to 
initialize the share-partition start offset.";
+
     public final int consumerSessionTimeoutMs;
 
     public final int consumerHeartbeatIntervalMs;
@@ -58,6 +66,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public final int shareRecordLockDurationMs;
 
+    public final String shareAutoOffsetReset;
+
     private static final ConfigDef CONFIG = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             INT,
@@ -88,7 +98,13 @@ public final class GroupConfig extends AbstractConfig {
             ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT,
             atLeast(1000),
             MEDIUM,
-            ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC);
+            ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
+        .define(SHARE_AUTO_OFFSET_RESET_CONFIG,
+            STRING,
+            SHARE_AUTO_OFFSET_RESET_DEFAULT,
+            in(Utils.enumOptions(ShareGroupAutoOffsetReset.class)),
+            MEDIUM,
+            SHARE_AUTO_OFFSET_RESET_DOC);
 
     public GroupConfig(Map<?, ?> props) {
         super(CONFIG, props, false);
@@ -97,6 +113,7 @@ public final class GroupConfig extends AbstractConfig {
         this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         this.shareHeartbeatIntervalMs = 
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareRecordLockDurationMs = 
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+        this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
     }
 
     public static ConfigDef configDef() {
@@ -203,6 +220,13 @@ public final class GroupConfig extends AbstractConfig {
         return new GroupConfig(props);
     }
 
+    /**
+     * The default share group auto offset reset strategy.
+     */
+    public static ShareGroupAutoOffsetReset defaultShareAutoOffsetReset() {
+        return 
ShareGroupAutoOffsetReset.valueOf(SHARE_AUTO_OFFSET_RESET_DEFAULT.toUpperCase(Locale.ROOT));
+    }
+
     /**
      * The consumer group session timeout in milliseconds.
      */
@@ -237,4 +261,20 @@ public final class GroupConfig extends AbstractConfig {
     public int shareRecordLockDurationMs() {
         return shareRecordLockDurationMs;
     }
+
+    /**
+     * The share group auto offset reset strategy.
+     */
+    public ShareGroupAutoOffsetReset shareAutoOffsetReset() {
+        return 
ShareGroupAutoOffsetReset.valueOf(shareAutoOffsetReset.toUpperCase(Locale.ROOT));
+    }
+
+    public enum ShareGroupAutoOffsetReset {
+        LATEST, EARLIEST;
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase(Locale.ROOT);
+        }
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 9a817a0a40d..fe11f50d2ff 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigTest;
@@ -27,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -57,8 +59,10 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
+            } else if 
(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "hello", "1.0");
             } else {
-                assertPropertyInvalid(name, "not_a_number", "-1");
+                assertPropertyInvalid(name, "not_a_number", "-0.1");
             }
         });
     }
@@ -71,6 +75,21 @@ public class GroupConfigTest {
         }
     }
 
+    @Test
+    public void testValidShareAutoOffsetResetValues() {
+
+        Properties props = createValidGroupConfig();
+
+        // Check for value "latest"
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+        doTestValidProps(props);
+        props = createValidGroupConfig();
+
+        // Check for value "earliest"
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
+        doTestValidProps(props);
+    }
+
     @Test
     public void testInvalidProps() {
 
@@ -78,56 +97,65 @@ public class GroupConfigTest {
 
         // Check for invalid consumerSessionTimeoutMs, < MIN
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid consumerSessionTimeoutMs, > MAX
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "70000");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid consumerHeartbeatIntervalMs, < MIN
         props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid consumerHeartbeatIntervalMs, > MAX
         props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid shareSessionTimeoutMs, < MIN
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid shareSessionTimeoutMs, > MAX
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid shareHeartbeatIntervalMs, < MIN
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid shareHeartbeatIntervalMs, > MAX
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid shareRecordLockDurationMs, < MIN
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "10000");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
         // Check for invalid shareRecordLockDurationMs, > MAX
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000");
-        doTestInvalidProps(props);
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareAutoOffsetReset
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "hello");
+        doTestInvalidProps(props, ConfigException.class);
     }
 
-    private void doTestInvalidProps(Properties props) {
-        assertThrows(InvalidConfigurationException.class, () -> 
GroupConfig.validate(props, createGroupCoordinatorConfig(), 
createShareGroupConfig()));
+    private void doTestInvalidProps(Properties props, Class<? extends 
Exception> exceptionClassName) {
+        assertThrows(exceptionClassName, () -> GroupConfig.validate(props, 
createGroupCoordinatorConfig(), createShareGroupConfig()));
+    }
+
+    private void doTestValidProps(Properties props) {
+        assertDoesNotThrow(() -> GroupConfig.validate(props, 
createGroupCoordinatorConfig(), createShareGroupConfig()));
     }
 
     @Test
@@ -138,6 +166,7 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
"2000");
+        defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
 
         Properties props = new Properties();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -148,6 +177,7 @@ public class GroupConfigTest {
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
+        assertEquals("latest", 
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
     }
 
     @Test
@@ -165,6 +195,7 @@ public class GroupConfigTest {
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "45000");
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
         return props;
     }
 
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 2fa0d2162d3..a08f85a8108 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -401,7 +401,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             return ReadShareGroupStateResponse.toResponseData(
                 topicId,
                 partition,
-                PartitionFactory.DEFAULT_START_OFFSET,
+                PartitionFactory.UNINITIALIZED_START_OFFSET,
                 PartitionFactory.DEFAULT_STATE_EPOCH,
                 Collections.emptyList()
             );
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
 
b/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
index 9d37b114b27..83d3d7d74a8 100644
--- 
a/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/persister/NoOpShareStatePersister.java
@@ -53,7 +53,7 @@ public class NoOpShareStatePersister implements Persister {
         for (TopicData<PartitionIdLeaderEpochData> topicData : 
reqData.topicsData()) {
             resultArgs.add(new TopicData<>(topicData.topicId(), 
topicData.partitions().stream().
                 map(partitionIdData -> PartitionFactory.newPartitionAllData(
-                    partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, 
Collections.emptyList()))
+                    partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, 
PartitionFactory.UNINITIALIZED_START_OFFSET, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, 
Collections.emptyList()))
                 .collect(Collectors.toList())));
         }
         return CompletableFuture.completedFuture(new 
ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build());
@@ -93,7 +93,7 @@ public class NoOpShareStatePersister implements Persister {
         for (TopicData<PartitionIdLeaderEpochData> topicData : 
reqData.topicsData()) {
             resultArgs.add(new TopicData<>(topicData.topicId(), 
topicData.partitions().stream().
                 map(partitionIdData -> 
PartitionFactory.newPartitionStateErrorData(
-                    partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
+                    partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, 
PartitionFactory.UNINITIALIZED_START_OFFSET, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
                 .collect(Collectors.toList())));
         }
         return CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
diff --git 
a/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
 
b/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 7336547e8d6..abd44a854ee 100644
--- 
a/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++ 
b/share/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -26,17 +26,17 @@ import java.util.List;
  */
 public class PartitionFactory {
     public static final int DEFAULT_STATE_EPOCH = 0;
-    public static final int DEFAULT_START_OFFSET = 0;
+    public static final int UNINITIALIZED_START_OFFSET = -1;
     public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
     public static final int DEFAULT_LEADER_EPOCH = 0;
     public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();
 
     public static PartitionIdData newPartitionIdData(int partition) {
-        return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
DEFAULT_LEADER_EPOCH, null);
+        return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
DEFAULT_LEADER_EPOCH, null);
     }
 
     public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int 
partition, int leaderEpoch) {
-        return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, 
null);
+        return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
leaderEpoch, null);
     }
 
     public static PartitionStateData newPartitionStateData(int partition, int 
stateEpoch, long startOffset) {
@@ -44,7 +44,7 @@ public class PartitionFactory {
     }
 
     public static PartitionErrorData newPartitionErrorData(int partition, 
short errorCode, String errorMessage) {
-        return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
DEFAULT_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
+        return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, 
null);
     }
 
     public static PartitionStateErrorData newPartitionStateErrorData(int 
partition, int stateEpoch, long startOffset, short errorCode, String 
errorMessage) {


Reply via email to