This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 699ae1b75b0 KAFKA-16729: Support isolation level for share consumer 
(#19261)
699ae1b75b0 is described below

commit 699ae1b75b0f94601f99b389dd930c2910054c38
Author: Abhinav Dixit <[email protected]>
AuthorDate: Thu Apr 10 13:30:03 2025 +0530

    KAFKA-16729: Support isolation level for share consumer (#19261)
    
    This PR adds the share group dynamic config `share.isolation.level`.
    Until now, share groups only supported `READ_UNCOMMITTED` isolation
    level type. With this PR, we aim to support `READ_COMMITTED` isolation
    type to share groups.
    
    Reviewers: Andrew Schofield <[email protected]>, Jun Rao 
<[email protected]>, Apoorv Mittal <[email protected]>
---
 checkstyle/suppressions.xml                        |   3 +
 .../kafka/clients/consumer/ShareConsumerTest.java  | 439 +++++++++++++++++-
 .../java/kafka/server/share/ShareFetchUtils.java   |   3 +-
 .../java/kafka/server/share/SharePartition.java    | 240 +++++++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   4 +-
 .../kafka/server/share/DelayedShareFetchTest.java  |  26 +-
 .../kafka/server/share/ShareFetchUtilsTest.java    |  20 +-
 .../server/share/SharePartitionManagerTest.java    |  16 +-
 .../kafka/server/share/SharePartitionTest.java     | 512 +++++++++++++++++++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../kafka/coordinator/group/GroupConfig.java       |  40 ++
 .../kafka/coordinator/group/GroupConfigTest.java   |  28 ++
 12 files changed, 1232 insertions(+), 102 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 59aec7b9350..429d699e7a1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -168,6 +168,9 @@
     <suppress checks="NPathComplexity"
               
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
 
+    <suppress checks="ClassFanOutComplexity"
+              files="ShareConsumerTest.java"/>
+
     <!-- connect tests-->
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 6f3b292e0d9..8d9e4dbd610 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -83,6 +83,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -102,7 +103,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -325,12 +325,12 @@ public class ShareConsumerTest {
             // Waiting for heartbeat to propagate the subscription change.
             TestUtils.waitForCondition(() -> {
                 shareConsumer.poll(Duration.ofMillis(500));
-                return partitionExceptionMap.containsKey(tp) && 
partitionExceptionMap.containsKey(tp2);
+                return partitionOffsetsMap.containsKey(tp) && 
partitionOffsetsMap.containsKey(tp2);
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
from the updated subscription");
 
             // Verifying if the callback was invoked without exceptions for 
the partitions for both topics.
-            assertNull(partitionExceptionMap.get(tp));
-            assertNull(partitionExceptionMap.get(tp2));
+            assertFalse(partitionExceptionMap.containsKey(tp));
+            assertFalse(partitionExceptionMap.containsKey(tp2));
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -356,11 +356,11 @@ public class ShareConsumerTest {
 
             TestUtils.waitForCondition(() -> {
                 shareConsumer.poll(Duration.ofMillis(500));
-                return partitionExceptionMap.containsKey(tp);
+                return partitionOffsetsMap.containsKey(tp);
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to 
callback");
 
-            // We expect null exception as the acknowledgment error code is 
null.
-            assertNull(partitionExceptionMap.get(tp));
+            // We expect no exception as the acknowledgment error code is null.
+            assertFalse(partitionExceptionMap.containsKey(tp));
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -388,9 +388,8 @@ public class ShareConsumerTest {
             shareConsumer.poll(Duration.ofMillis(1000));
             shareConsumer.close();
 
-            // We expect null exception as the acknowledgment error code is 
null.
-            assertTrue(partitionExceptionMap.containsKey(tp));
-            assertNull(partitionExceptionMap.get(tp));
+            // We expect no exception as the acknowledgment error code is null.
+            assertFalse(partitionExceptionMap.containsKey(tp));
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -423,6 +422,13 @@ public class ShareConsumerTest {
         }
     }
 
+    /**
+     * Test implementation of AcknowledgementCommitCallback to track the 
completed acknowledgements.
+     * partitionOffsetsMap is used to track the offsets acknowledged for each 
partition.
+     * partitionExceptionMap is used to track the exception encountered for 
each partition if any.
+     * Note - Multiple calls to {@link #onComplete(Map, Exception)} will not 
update the partitionExceptionMap for any existing partitions,
+     * so please ensure to clear the partitionExceptionMap after every call to 
onComplete() in a single test.
+     */
     private static class TestableAcknowledgementCommitCallback implements 
AcknowledgementCommitCallback {
         private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
         private final Map<TopicPartition, Exception> partitionExceptionMap;
@@ -442,7 +448,7 @@ public class ShareConsumerTest {
                     mergedOffsets.addAll(newOffsets);
                     return mergedOffsets;
                 });
-                if 
(!partitionExceptionMap.containsKey(partition.topicPartition())) {
+                if 
(!partitionExceptionMap.containsKey(partition.topicPartition()) && exception != 
null) {
                     partitionExceptionMap.put(partition.topicPartition(), 
exception);
                 }
             });
@@ -676,10 +682,10 @@ public class ShareConsumerTest {
             // The callback will receive the acknowledgement responses 
asynchronously after the next poll.
             TestUtils.waitForCondition(() -> {
                 shareConsumer1.poll(Duration.ofMillis(1000));
-                return partitionExceptionMap1.containsKey(tp);
+                return partitionOffsetsMap1.containsKey(tp);
             }, 30000, 100L, () -> "Didn't receive call to callback");
 
-            assertNull(partitionExceptionMap1.get(tp));
+            assertFalse(partitionExceptionMap1.containsKey(tp));
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -693,8 +699,7 @@ public class ShareConsumerTest {
             shareConsumer.subscribe(Set.of(tp.topic()));
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
-            Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, Map.of()));
 
             // The acknowledgement mode moves to PENDING from UNKNOWN.
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
@@ -715,7 +720,7 @@ public class ShareConsumerTest {
 
             TestUtils.waitForCondition(() -> {
                 shareConsumer.poll(Duration.ofMillis(500));
-                return partitionExceptionMap1.containsKey(tp);
+                return partitionOffsetsMap1.containsKey(tp);
             }, 30000, 100L, () -> "Didn't receive call to callback");
             verifyShareGroupStateTopicRecordsProduced();
         }
@@ -778,8 +783,7 @@ public class ShareConsumerTest {
 
             shareConsumer1.close();
 
-            assertTrue(partitionExceptionMap.containsKey(tp));
-            assertNull(partitionExceptionMap.get(tp));
+            assertFalse(partitionExceptionMap.containsKey(tp));
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -952,10 +956,10 @@ public class ShareConsumerTest {
             // The callback will receive the acknowledgement responses after 
the next poll.
             TestUtils.waitForCondition(() -> {
                 shareConsumer.poll(Duration.ofMillis(1000));
-                return partitionExceptionMap1.containsKey(tp);
+                return partitionOffsetsMap1.containsKey(tp);
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit 
callback did not receive the response yet");
 
-            assertNull(partitionExceptionMap1.get(tp));
+            assertFalse(partitionExceptionMap1.containsKey(tp));
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -2090,6 +2094,335 @@ public class ShareConsumerTest {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    @ClusterTest
+    public void testReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 5000L, 8);
+            // 5th and 10th message transaction was aborted, hence they won't 
be included in the fetched records.
+            assertEquals(8, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                if (messageCounter % 5 == 0)
+                    messageCounter++;
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testReadUncommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 5000L, 10);
+            // Even though 5th and 10th message transaction was aborted, they 
will be included in the fetched records since IsolationLevel is 
READ_UNCOMMITTED.
+            assertEquals(10, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 5000L, 1);
+                assertEquals(1, records.count());
+                ConsumerRecord<byte[], byte[]> record = 
records.iterator().next();
+                assertEquals("Message 1", new String(record.value()));
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Second transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 2");
+
+                records = waitedPoll(shareConsumer, 5000L, 1);
+                assertEquals(1, records.count());
+                record = records.iterator().next();
+                assertEquals("Message 2", new String(record.value()));
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Third transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
3");
+                // Fourth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 4");
+
+                records = waitedPoll(shareConsumer, 5000L, 2);
+                // Message 3 and Message 4 would be returned by this poll.
+                assertEquals(2, records.count());
+                Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
records.iterator();
+                record = recordIterator.next();
+                assertEquals("Message 3", new String(record.value()));
+                record = recordIterator.next();
+                assertEquals("Message 4", new String(record.value()));
+                // We will make Message 3 and Message 4 available for 
re-consumption.
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+                shareConsumer.commitSync();
+
+                // We are altering IsolationLevel to READ_COMMITTED now. We 
will only read committed transactions now.
+                alterShareIsolationLevel("group1", "read_committed");
+
+                // Fifth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
5");
+                // Sixth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 6");
+                // Seventh transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 7");
+                // Eighth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
8");
+
+                // Since isolation level is READ_COMMITTED, we can consume 
Message 3 (committed transaction that was released), Message 5 and Message 8.
+                // We cannot consume Message 4 (aborted transaction that was 
released), Message 6 and Message 7 since they were aborted.
+                List<String> messages = new ArrayList<>();
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(5000));
+                    if (pollRecords.count() > 0) {
+                        for (ConsumerRecord<byte[], byte[]> pollRecord : 
pollRecords)
+                            messages.add(new String(pollRecord.value()));
+                        pollRecords.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                        shareConsumer.commitSync();
+                    }
+                    return messages.size() == 3;
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all 
records post altering share isolation level");
+
+                assertEquals("Message 3", messages.get(0));
+                assertEquals("Message 5", messages.get(1));
+                assertEquals("Message 8", messages.get(2));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                transactionalProducer.close();
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void 
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 5000L, 1);
+                assertEquals(1, records.count());
+                ConsumerRecord<byte[], byte[]> record = 
records.iterator().next();
+                assertEquals("Message 1", new String(record.value()));
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Second transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 2");
+
+                // Setting the acknowledgement commit callback to verify 
acknowledgement completion.
+                Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
+                shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, Map.of()));
+
+                // We will not receive any records since the transaction for 
Message 2 was aborted. Wait for the
+                // aborted marker offset for Message 2 (3L) to be fetched and 
acknowledged by the consumer.
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(500));
+                    return pollRecords.count() == 0 && 
partitionOffsetsMap.containsKey(tp) && partitionOffsetsMap.get(tp).contains(3L);
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort 
transaction and marker offset for Message 2");
+
+                // Third transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
3");
+                // Fourth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 4");
+
+                // Setting the acknowledgement commit callback to verify 
acknowledgement completion.
+                Map<TopicPartition, Set<Long>> partitionOffsetsMap2 = new 
HashMap<>();
+                shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap2, Map.of()));
+
+                records = waitedPoll(shareConsumer, 5000L, 1);
+                // Message 3 would be returned by this poll.
+                assertEquals(1, records.count());
+                record = records.iterator().next();
+                assertEquals("Message 3", new String(record.value()));
+                // We will make Message 3 available for re-consumption.
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+                shareConsumer.commitSync();
+
+                // Wait for the aborted marker offset for Message 4 (7L) to be 
fetched and acknowledged by the consumer.
+                TestUtils.waitForCondition(() -> {
+                    shareConsumer.poll(Duration.ofMillis(500));
+                    return partitionOffsetsMap2.containsKey(tp) && 
partitionOffsetsMap2.get(tp).contains(7L);
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort 
transaction marker offset for Message 4");
+
+                // We are altering IsolationLevel to READ_UNCOMMITTED now. We 
will read both committed/aborted transactions now.
+                alterShareIsolationLevel("group1", "read_uncommitted");
+
+                // Fifth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
5");
+                // Sixth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 6");
+                // Seventh transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 7");
+                // Eighth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
8");
+
+                // Since isolation level is READ_UNCOMMITTED, we can consume 
Message 3 (committed transaction that was released), Message 5, Message 6, 
Message 7 and Message 8.
+                List<String> finalMessages = new ArrayList<>();
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(5000));
+                    if (pollRecords.count() > 0) {
+                        for (ConsumerRecord<byte[], byte[]> pollRecord : 
pollRecords)
+                            finalMessages.add(new String(pollRecord.value()));
+                        pollRecords.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                        shareConsumer.commitSync();
+                    }
+                    return finalMessages.size() == 5;
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all 
records post altering share isolation level");
+
+                assertEquals("Message 3", finalMessages.get(0));
+                assertEquals("Message 5", finalMessages.get(1));
+                assertEquals("Message 6", finalMessages.get(2));
+                assertEquals("Message 7", finalMessages.get(3));
+                assertEquals("Message 8", finalMessages.get(4));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                transactionalProducer.close();
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void 
testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 5000L, 1);
+                assertEquals(1, records.count());
+                ConsumerRecord<byte[], byte[]> record = 
records.iterator().next();
+                assertEquals("Message 1", new String(record.value()));
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                shareConsumer.commitSync();
+
+                // Second transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 2");
+
+                // Setting the acknowledgement commit callback to verify 
acknowledgement completion.
+                Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
+                shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, Map.of()));
+
+                // We will not receive any records since the transaction for 
Message 2 was aborted. Wait for the
+                // aborted marker offset for Message 2 (3L) to be fetched and 
acknowledged by the consumer.
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(500));
+                    return pollRecords.count() == 0 && 
partitionOffsetsMap.containsKey(tp) && partitionOffsetsMap.get(tp).contains(3L);
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort 
transaction and marker offset for Message 2");
+
+                // Third transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
3");
+                // Fourth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 4");
+
+                // Setting the acknowledgement commit callback to verify 
acknowledgement completion.
+                Map<TopicPartition, Set<Long>> partitionOffsetsMap2 = new 
HashMap<>();
+                shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap2, Map.of()));
+
+                records = waitedPoll(shareConsumer, 5000L, 1);
+                // Message 3 would be returned by this poll.
+                assertEquals(1, records.count());
+                record = records.iterator().next();
+                assertEquals("Message 3", new String(record.value()));
+                // We will make Message 3 available for re-consumption.
+                records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.REJECT));
+                shareConsumer.commitSync();
+
+                // Wait for the aborted marker offset for Message 4 (7L) to be 
fetched and acknowledged by the consumer.
+                TestUtils.waitForCondition(() -> {
+                    shareConsumer.poll(Duration.ofMillis(500));
+                    return partitionOffsetsMap2.containsKey(tp) && 
partitionOffsetsMap2.get(tp).contains(7L);
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort 
transaction marker offset for Message 4");
+
+                // We are altering IsolationLevel to READ_UNCOMMITTED now. We 
will read both committed/aborted transactions now.
+                alterShareIsolationLevel("group1", "read_uncommitted");
+
+                // Fifth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
5");
+                // Sixth transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 6");
+                // Seventh transaction is aborted.
+                produceAbortedTransaction(transactionalProducer, "Message 7");
+                // Eighth transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
8");
+
+                // Since isolation level is READ_UNCOMMITTED, we can consume 
Message 5, Message 6, Message 7 and Message 8.
+                List<String> finalMessages = new ArrayList<>();
+                TestUtils.waitForCondition(() -> {
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(5000));
+                    if (pollRecords.count() > 0) {
+                        for (ConsumerRecord<byte[], byte[]> pollRecord : 
pollRecords)
+                            finalMessages.add(new String(pollRecord.value()));
+                        pollRecords.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+                        shareConsumer.commitSync();
+                    }
+                    return finalMessages.size() == 4;
+                }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all 
records post altering share isolation level");
+
+                assertEquals("Message 5", finalMessages.get(0));
+                assertEquals("Message 6", finalMessages.get(1));
+                assertEquals("Message 7", finalMessages.get(2));
+                assertEquals("Message 8", finalMessages.get(3));
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                transactionalProducer.close();
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
     /**
      * Util class to encapsulate state for a consumer/producer
      * being executed by an {@link ExecutorService}.
@@ -2127,6 +2460,59 @@ public class ShareConsumerTest {
         }
     }
 
+    private void produceCommittedTransaction(Producer<byte[], byte[]> 
transactionalProducer, String message) {
+        try {
+            transactionalProducer.beginTransaction();
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, message.getBytes(), 
message.getBytes());
+            Future<RecordMetadata> future = transactionalProducer.send(record);
+            transactionalProducer.flush();
+            future.get(); // Ensure producer send is complete before committing
+            transactionalProducer.commitTransaction();
+        } catch (Exception e) {
+            transactionalProducer.abortTransaction();
+        }
+    }
+
+    private void produceAbortedTransaction(Producer<byte[], byte[]> 
transactionalProducer, String message) {
+        try {
+            transactionalProducer.beginTransaction();
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, message.getBytes(), 
message.getBytes());
+            transactionalProducer.send(record);
+            transactionalProducer.flush();
+            transactionalProducer.abortTransaction();
+        } catch (Exception e) {
+            transactionalProducer.abortTransaction();
+        }
+    }
+
+    private void 
produceCommittedAndAbortedTransactionsInInterval(Producer<byte[], byte[]> 
transactionalProducer, int messageCount, int intervalAbortedTransactions) {
+        transactionalProducer.initTransactions();
+        int transactionCount = 0;
+        try {
+            for (int i = 0; i < messageCount; i++) {
+                transactionalProducer.beginTransaction();
+                String recordMessage = "Message " + (i + 1);
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, recordMessage.getBytes(), 
recordMessage.getBytes());
+                Future<RecordMetadata> future = 
transactionalProducer.send(record);
+                transactionalProducer.flush();
+                // Increment transaction count
+                transactionCount++;
+                if (transactionCount % intervalAbortedTransactions == 0) {
+                    // Aborts every intervalAbortedTransactions transaction
+                    transactionalProducer.abortTransaction();
+                } else {
+                    // Commits other transactions
+                    future.get(); // Ensure producer send is complete before 
committing
+                    transactionalProducer.commitTransaction();
+                }
+            }
+        } catch (Exception e) {
+            transactionalProducer.abortTransaction();
+        } finally {
+            transactionalProducer.close();
+        }
+    }
+
     private int consumeMessages(AtomicInteger totalMessagesConsumed,
                                 int totalMessages,
                                 String groupId,
@@ -2326,6 +2712,19 @@ public class ShareConsumerTest {
         }
     }
 
+    private void alterShareIsolationLevel(String groupId, String newValue) {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+        Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
+        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+            GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, newValue), 
AlterConfigOp.OpType.SET)));
+        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+        try (Admin adminClient = createAdminClient()) {
+            assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+                .all()
+                .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+        }
+    }
+
     /**
      * Test utility which encapsulates a {@link ShareConsumer} whose record 
processing
      * behavior can be supplied as a function argument.
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index 19b6977aec9..3cfab25e684 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -114,7 +114,8 @@ public class ShareFetchUtils {
                     shareFetch.batchSize(),
                     shareFetch.maxFetchRecords() - acquiredRecordsCount,
                     shareFetchPartitionData.fetchOffset(),
-                    fetchPartitionData
+                    fetchPartitionData,
+                    shareFetch.fetchParams().isolation
                 );
                 log.trace("Acquired records: {} for topicIdPartition: {}", 
shareAcquiredRecords, topicIdPartition);
                 // Maybe, in the future, check if no records are acquired, and 
we want to retry
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 8650e4cc2be..db2ff4fc8cf 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -31,8 +31,11 @@ import 
org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfig;
@@ -54,6 +57,7 @@ import 
org.apache.kafka.server.share.persister.PersisterStateBatch;
 import org.apache.kafka.server.share.persister.ReadShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.TopicData;
 import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
+import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.server.util.timer.TimerTask;
@@ -63,12 +67,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,7 +94,7 @@ import static 
kafka.server.share.ShareFetchUtils.recordLockDurationMsOrDefault;
  * consumers. The class maintains the state of the records that have been 
fetched from the leader
  * and are in-flight.
  */
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
 public class SharePartition {
 
     private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
@@ -686,6 +695,7 @@ public class SharePartition {
      *                           if the records are already part of the same 
fetch batch.
      * @param fetchOffset        The fetch offset for which the records are 
fetched.
      * @param fetchPartitionData The fetched records for the share partition.
+     * @param isolationLevel      The isolation level for the share fetch 
request.
      * @return The acquired records for the share partition.
      */
     @SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid 
suppression
@@ -694,7 +704,8 @@ public class SharePartition {
         int batchSize,
         int maxFetchRecords,
         long fetchOffset,
-        FetchPartitionData fetchPartitionData
+        FetchPartitionData fetchPartitionData,
+        FetchIsolation isolationLevel
     ) {
         log.trace("Received acquire request for share partition: {}-{} 
memberId: {}", groupId, topicIdPartition, memberId);
         if (stateNotActive() || maxFetchRecords <= 0) {
@@ -739,8 +750,9 @@ public class SharePartition {
             if (subMap.isEmpty()) {
                 log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
                     groupId, topicIdPartition);
-                return acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
                     firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxFetchRecords);
+                return 
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, 
isolationLevel, shareAcquiredRecords);
             }
 
             log.trace("Overlap exists with in-flight records. Acquire the 
records if available for"
@@ -850,6 +862,7 @@ public class SharePartition {
             }
             if (!result.isEmpty()) {
                 maybeUpdateReadGapFetchOffset(result.get(result.size() - 
1).lastOffset() + 1);
+                return 
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, 
isolationLevel, new ShareAcquiredRecords(result, acquiredCount));
             }
             return new ShareAcquiredRecords(result, acquiredCount);
         } finally {
@@ -1169,7 +1182,7 @@ public class SharePartition {
 
             // Though such batches can be removed from the cache, but it is 
better to archive them so
             // that they are never acquired again.
-            boolean anyRecordArchived = archiveAvailableRecords(fetchOffset, 
baseOffset, subMap);
+            boolean anyRecordArchived = archiveRecords(fetchOffset, 
baseOffset, subMap, RecordState.AVAILABLE);
 
             // If we have transitioned the state of any batch/offset from 
AVAILABLE to ARCHIVED,
             // then there is a chance that the next fetch offset can change.
@@ -1190,21 +1203,22 @@ public class SharePartition {
     private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) {
         lock.writeLock().lock();
         try {
-            return archiveAvailableRecords(startOffset, logStartOffset, 
cachedState);
+            return archiveRecords(startOffset, logStartOffset, cachedState, 
RecordState.AVAILABLE);
         } finally {
             lock.writeLock().unlock();
         }
     }
 
     /**
-     * The method archive the available records in the given map that are 
before the end offset.
+     * The method archive the records in a given state in the map that are 
before the end offset.
      *
-     * @param startOffset The offset from which the available records should 
be archived.
-     * @param endOffset The offset before which the available records should 
be archived.
+     * @param startOffset The offset from which the records should be archived.
+     * @param endOffset The offset before which the records should be archived.
      * @param map The map containing the in-flight records.
+     * @param initialState The initial state of the records to be archived.
      * @return A boolean which indicates whether any record is archived or not.
      */
-    private boolean archiveAvailableRecords(long startOffset, long endOffset, 
NavigableMap<Long, InFlightBatch> map) {
+    private boolean archiveRecords(long startOffset, long endOffset, 
NavigableMap<Long, InFlightBatch> map, RecordState initialState) {
         lock.writeLock().lock();
         try {
             boolean isAnyOffsetArchived = false, isAnyBatchArchived = false;
@@ -1225,16 +1239,16 @@ public class SharePartition {
                         groupId, topicIdPartition);
 
                     if (inFlightBatch.offsetState() == null) {
-                        if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
+                        if (inFlightBatch.batchState() != initialState) {
                             continue;
                         }
                         inFlightBatch.maybeInitializeOffsetStateUpdate();
                     }
-                    isAnyOffsetArchived = isAnyOffsetArchived || 
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1);
+                    isAnyOffsetArchived = isAnyOffsetArchived || 
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1, 
initialState);
                     continue;
                 }
                 // The in-flight batch is a full match hence change the state 
of the complete batch.
-                isAnyBatchArchived = isAnyBatchArchived || 
archiveCompleteBatch(inFlightBatch);
+                isAnyBatchArchived = isAnyBatchArchived || 
archiveCompleteBatch(inFlightBatch, initialState);
             }
             return isAnyOffsetArchived || isAnyBatchArchived;
         } finally {
@@ -1244,7 +1258,9 @@ public class SharePartition {
 
     private boolean archivePerOffsetBatchRecords(InFlightBatch inFlightBatch,
                                                  long startOffsetToArchive,
-                                                 long endOffsetToArchive) {
+                                                 long endOffsetToArchive,
+                                                 RecordState initialState
+    ) {
         lock.writeLock().lock();
         try {
             boolean isAnyOffsetArchived = false;
@@ -1257,11 +1273,14 @@ public class SharePartition {
                     // No further offsets to process.
                     break;
                 }
-                if (offsetState.getValue().state != RecordState.AVAILABLE) {
+                if (offsetState.getValue().state != initialState) {
                     continue;
                 }
 
                 offsetState.getValue().archive(EMPTY_MEMBER_ID);
+                if (initialState == RecordState.ACQUIRED) {
+                    
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
+                }
                 isAnyOffsetArchived = true;
             }
             return isAnyOffsetArchived;
@@ -1270,13 +1289,16 @@ public class SharePartition {
         }
     }
 
-    private boolean archiveCompleteBatch(InFlightBatch inFlightBatch) {
+    private boolean archiveCompleteBatch(InFlightBatch inFlightBatch, 
RecordState initialState) {
         lock.writeLock().lock();
         try {
             log.trace("Archiving complete batch: {} for the share partition: 
{}-{}", inFlightBatch, groupId, topicIdPartition);
-            if (inFlightBatch.batchState() == RecordState.AVAILABLE) {
+            if (inFlightBatch.batchState() == initialState) {
                 // Change the state of complete batch since the same state 
exists for the entire inFlight batch.
                 inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
+                if (initialState == RecordState.ACQUIRED) {
+                    
inFlightBatch.batchState.cancelAndClearAcquisitionLockTimeoutTask();
+                }
                 return true;
             }
         } finally {
@@ -2475,6 +2497,192 @@ public class SharePartition {
         }
     }
 
+    private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+        FetchPartitionData fetchPartitionData,
+        FetchIsolation isolationLevel,
+        ShareAcquiredRecords shareAcquiredRecords
+    ) {
+        if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+            return shareAcquiredRecords;
+
+        // When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+        // transactions that were aborted/did not commit due to timeout.
+        List<AcquiredRecords> result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+            shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+        int acquiredCount = 0;
+        for (AcquiredRecords records : result) {
+            acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+        }
+        return new ShareAcquiredRecords(result, acquiredCount);
+    }
+
+    private List<AcquiredRecords> filterAbortedTransactionalAcquiredRecords(
+        Iterable<? extends RecordBatch> batches,
+        List<AcquiredRecords> acquiredRecords,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        // The record batches that need to be archived in cachedState because 
they were a part of aborted transactions.
+        List<RecordBatch> recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+        for (RecordBatch recordBatch : recordsToArchive) {
+            // Archive the offsets/batches in the cached state.
+            NavigableMap<Long, InFlightBatch> subMap = 
fetchSubMap(recordBatch);
+            archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset() 
+ 1, subMap, RecordState.ACQUIRED);
+        }
+        return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+    }
+
+    /**
+     * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
+     * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
+     * @param acquiredRecordsList The list containing acquired records. This 
list is sorted by the firstOffset of the acquired batch.
+     * @param batchesToArchive The list containing record batches to archive. 
This list is sorted by the baseOffset of the record batch.
+     * @return The list containing filtered acquired records offsets.
+     */
+    List<AcquiredRecords> filterRecordBatchesFromAcquiredRecords(
+        List<AcquiredRecords> acquiredRecordsList,
+        List<RecordBatch> batchesToArchive
+    ) {
+        Iterator<RecordBatch> batchesToArchiveIterator = 
batchesToArchive.iterator();
+        if (!batchesToArchiveIterator.hasNext())
+            return acquiredRecordsList;
+        List<AcquiredRecords> result = new ArrayList<>();
+        Iterator<AcquiredRecords> acquiredRecordsListIter = 
acquiredRecordsList.iterator();
+        RecordBatch batchToArchive = batchesToArchiveIterator.next();
+        AcquiredRecords unresolvedAcquiredRecords = null;
+
+        while (unresolvedAcquiredRecords != null || 
acquiredRecordsListIter.hasNext()) {
+            if (unresolvedAcquiredRecords == null)
+                unresolvedAcquiredRecords = acquiredRecordsListIter.next();
+
+            long unresolvedFirstOffset = 
unresolvedAcquiredRecords.firstOffset();
+            long unresolvedLastOffset = unresolvedAcquiredRecords.lastOffset();
+            short unresolvedDeliveryCount = 
unresolvedAcquiredRecords.deliveryCount();
+
+            if (batchToArchive == null) {
+                result.add(unresolvedAcquiredRecords);
+                unresolvedAcquiredRecords = null;
+                continue;
+            }
+
+            // Non-overlap check - unresolvedFirstOffset offsets lie before 
the batchToArchive offsets. No need to filter out the offsets in such a 
scenario.
+            if (unresolvedLastOffset < batchToArchive.baseOffset()) {
+                // Offsets in unresolvedAcquiredRecords do not overlap with 
batchToArchive, hence it should not get filtered out.
+                result.add(unresolvedAcquiredRecords);
+                unresolvedAcquiredRecords = null;
+            }
+
+            // Overlap check - unresolvedFirstOffset offsets overlap with the 
batchToArchive offsets. We need to filter out the overlapping
+            // offsets in such a scenario.
+            if (unresolvedFirstOffset <= batchToArchive.lastOffset() &&
+                unresolvedLastOffset >= batchToArchive.baseOffset()) {
+                unresolvedAcquiredRecords = null;
+                // Split the unresolvedFirstOffset into parts - before and 
after the overlapping record batchToArchive.
+                if (unresolvedFirstOffset < batchToArchive.baseOffset()) {
+                    // The offsets in unresolvedAcquiredRecords that are 
present before batchToArchive's baseOffset should not get filtered out.
+                    result.add(new AcquiredRecords()
+                        .setFirstOffset(unresolvedFirstOffset)
+                        .setLastOffset(batchToArchive.baseOffset() - 1)
+                        .setDeliveryCount(unresolvedDeliveryCount));
+                }
+                if (unresolvedLastOffset > batchToArchive.lastOffset()) {
+                    // The offsets in unresolvedAcquiredRecords that are 
present after batchToArchive's lastOffset should not get filtered out
+                    // and should be taken forward for further processing 
since they could potentially contain offsets that need to be archived.
+                    unresolvedAcquiredRecords = new AcquiredRecords()
+                        .setFirstOffset(batchToArchive.lastOffset() + 1)
+                        .setLastOffset(unresolvedLastOffset)
+                        .setDeliveryCount(unresolvedDeliveryCount);
+                }
+            }
+
+            // There is at least one offset in unresolvedFirstOffset which 
lies after the batchToArchive. Hence, we move forward
+            // the batchToArchive to the next element in 
batchesToArchiveIterator.
+            if (unresolvedLastOffset > batchToArchive.lastOffset()) {
+                if (batchesToArchiveIterator.hasNext())
+                    batchToArchive = batchesToArchiveIterator.next();
+                else
+                    batchToArchive = null;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * This function fetches the sub map from cachedState where all the offset 
details present in the recordBatch can be referred to
+     * OR it gives an exception if those offsets are not present in 
cachedState.
+     * @param recordBatch The record batch for which we want to find the sub 
map.
+     * @return the sub map containing all the offset details.
+     */
+    private NavigableMap<Long, InFlightBatch> fetchSubMap(RecordBatch 
recordBatch) {
+        lock.readLock().lock();
+        try {
+            Map.Entry<Long, InFlightBatch> floorEntry = 
cachedState.floorEntry(recordBatch.baseOffset());
+            if (floorEntry == null) {
+                log.debug("Fetched batch record {} not found for share 
partition: {}-{}", recordBatch, groupId,
+                    topicIdPartition);
+                throw new IllegalStateException(
+                    "Batch record not found. The request batch offsets are not 
found in the cache.");
+            }
+            return cachedState.subMap(floorEntry.getKey(), true, 
recordBatch.lastOffset(), true);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    // Visible for testing.
+    List<RecordBatch> fetchAbortedTransactionRecordBatches(
+        Iterable<? extends RecordBatch> batches,
+        List<FetchResponseData.AbortedTransaction> abortedTransactions
+    ) {
+        PriorityQueue<FetchResponseData.AbortedTransaction> 
orderedAbortedTransactions = orderedAbortedTransactions(abortedTransactions);
+        Set<Long> abortedProducerIds = new HashSet<>();
+        List<RecordBatch> recordsToArchive = new ArrayList<>();
+
+        for (RecordBatch currentBatch : batches) {
+            if (currentBatch.hasProducerId()) {
+                // remove from the aborted transactions queue, all aborted 
transactions which have begun before the
+                // current batch's last offset and add the associated 
producerIds to the aborted producer set.
+                while (!orderedAbortedTransactions.isEmpty() && 
orderedAbortedTransactions.peek().firstOffset() <= currentBatch.lastOffset()) {
+                    FetchResponseData.AbortedTransaction abortedTransaction = 
orderedAbortedTransactions.poll();
+                    abortedProducerIds.add(abortedTransaction.producerId());
+                }
+                long producerId = currentBatch.producerId();
+                if (containsAbortMarker(currentBatch)) {
+                    abortedProducerIds.remove(producerId);
+                } else if (isBatchAborted(currentBatch, abortedProducerIds)) {
+                    log.debug("Skipping aborted record batch for share 
partition: {}-{} with producerId {} and " +
+                        "offsets {} to {}", groupId, topicIdPartition, 
producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
+                    recordsToArchive.add(currentBatch);
+                }
+            }
+        }
+        return recordsToArchive;
+    }
+
+    private PriorityQueue<FetchResponseData.AbortedTransaction> 
orderedAbortedTransactions(List<FetchResponseData.AbortedTransaction> 
abortedTransactions) {
+        PriorityQueue<FetchResponseData.AbortedTransaction> 
orderedAbortedTransactions = new PriorityQueue<>(
+            abortedTransactions.size(), 
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
+        );
+        orderedAbortedTransactions.addAll(abortedTransactions);
+        return orderedAbortedTransactions;
+    }
+
+    private boolean isBatchAborted(RecordBatch batch, Set<Long> 
abortedProducerIds) {
+        return batch.isTransactional() && 
abortedProducerIds.contains(batch.producerId());
+    }
+
+    // Visible for testing.
+    boolean containsAbortMarker(RecordBatch batch) {
+        if (!batch.isControlBatch())
+            return false;
+
+        Iterator<Record> batchIterator = batch.iterator();
+        if (!batchIterator.hasNext())
+            return false;
+
+        Record firstRecord = batchIterator.next();
+        return ControlRecordType.ABORT == 
ControlRecordType.parse(firstRecord.key());
+    }
+
     // Visible for testing. Should only be used for testing purposes.
     NavigableMap<Long, InFlightBatch> cachedState() {
         return new ConcurrentSkipListMap<>(cachedState);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5894d3a40d3..68c923cb9e8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -57,7 +57,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.{Group, GroupConfigManager, 
GroupCoordinator}
+import org.apache.kafka.coordinator.group.{Group, GroupConfig, 
GroupConfigManager, GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
 import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
@@ -3210,7 +3210,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         shareFetchRequest.maxWait,
         fetchMinBytes,
         fetchMaxBytes,
-        FetchIsolation.HIGH_WATERMARK,
+        FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, 
groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
         clientMetadata,
         true
       )
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 6f720526eef..bb8b51b40e2 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -181,7 +181,7 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // We are testing the case when the share partition is getting fetched 
for the first time, so for the first time
@@ -253,7 +253,7 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // We are testing the case when the share partition has been fetched 
before, hence we are mocking positionDiff
@@ -305,7 +305,7 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
@@ -418,7 +418,7 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
@@ -580,7 +580,7 @@ public class DelayedShareFetchTest {
         // sp1 can be acquired now
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(true);
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // when forceComplete is called for delayedShareFetch2, since tp1 is 
common in between delayed share fetch
@@ -676,7 +676,7 @@ public class DelayedShareFetchTest {
             BROKER_TOPIC_STATS);
 
         when(sp0.canAcquireRecords()).thenReturn(true);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
@@ -919,15 +919,15 @@ public class DelayedShareFetchTest {
             new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), 
BATCH_SIZE, MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // All 5 partitions are acquirable.
@@ -1015,9 +1015,9 @@ public class DelayedShareFetchTest {
             new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), 
BATCH_SIZE, MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // Only 2 out of 5 partitions are acquirable.
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 2103a4fc361..d5acaef2060 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -98,10 +98,10 @@ public class ShareFetchUtilsTest {
         when(sp0.nextFetchOffset()).thenReturn((long) 3);
         when(sp1.nextFetchOffset()).thenReturn((long) 3);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
                 .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 
1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
                 
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
 
@@ -163,8 +163,8 @@ public class ShareFetchUtilsTest {
         when(sp0.nextFetchOffset()).thenReturn((long) 3);
         when(sp1.nextFetchOffset()).thenReturn((long) 3);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
 
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
         sharePartitions.put(tp0, sp0);
@@ -221,11 +221,11 @@ public class ShareFetchUtilsTest {
         when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
         when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             ShareAcquiredRecords.empty(),
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
                 .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 
1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
                 
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)),
             ShareAcquiredRecords.empty());
@@ -309,7 +309,7 @@ public class ShareFetchUtilsTest {
         // Mock the replicaManager.fetchOffsetForTimestamp method to return a 
timestamp and offset for the topic partition.
         FileRecords.TimestampAndOffset timestampAndOffset = new 
FileRecords.TimestampAndOffset(100L, 1L, Optional.empty());
         doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), 
Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
 anyLong(), any(), any(), anyBoolean());
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
 
         MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
             new SimpleRecord("0".getBytes(), "v".getBytes()),
@@ -390,10 +390,10 @@ public class ShareFetchUtilsTest {
             records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
             OptionalInt.empty(), false);
 
-        when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0, 
fetchPartitionData1)).thenReturn(
+        when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10, 0, 
fetchPartitionData1, FetchIsolation.HIGH_WATERMARK)).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
                 .setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 
1)));
-        when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0, 
fetchPartitionData2)).thenReturn(
+        when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8, 0, 
fetchPartitionData2, FetchIsolation.HIGH_WATERMARK)).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords()
                 
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
 
@@ -444,7 +444,7 @@ public class ShareFetchUtilsTest {
         // Mock the replicaManager.fetchOffsetForTimestamp method to throw 
exception.
         Throwable exception = new FencedLeaderEpochException("Fenced 
exception");
         
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
 anyLong(), any(), any(), anyBoolean());
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(ShareAcquiredRecords.empty());
 
         // When no records are acquired from share partition.
         List<ShareFetchPartitionData> responseData = List.of(
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index a87c1dd735d..a69c6c83071 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1119,13 +1119,13 @@ public class SharePartitionManagerTest {
         when(sp1.canAcquireRecords()).thenReturn(true);
         when(sp2.canAcquireRecords()).thenReturn(true);
         when(sp3.canAcquireRecords()).thenReturn(true);
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class))).thenReturn(
+        when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
             createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         // Mocks to have fetch offset metadata match for share partitions to 
avoid any extra calls to replicaManager.readFromLog.
         
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
@@ -1788,8 +1788,8 @@ public class SharePartitionManagerTest {
         when(sp1.canAcquireRecords()).thenReturn(false);
         when(sp2.maybeAcquireFetchLock()).thenReturn(true);
         when(sp2.canAcquireRecords()).thenReturn(false);
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(ShareAcquiredRecords.empty());
-        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(ShareAcquiredRecords.empty());
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(ShareAcquiredRecords.empty());
+        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(ShareAcquiredRecords.empty());
 
         List<DelayedOperationKey> delayedShareFetchWatchKeys = new 
ArrayList<>();
         topicIdPartitions.forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, 
topicIdPartition.topicId(), topicIdPartition.partition())));
@@ -2034,7 +2034,7 @@ public class SharePartitionManagerTest {
         
when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))).thenReturn(List.of(tp1, tp3));
 
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
         // Release acquired records on session close request for tp1 and tp3.
         sharePartitionManager.releaseSession(groupId, memberId);
 
@@ -2604,7 +2604,7 @@ public class SharePartitionManagerTest {
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(true);
         
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(new ShareAcquiredRecords(List.of(), 0));
 
         // Fail initialization for tp2.
         SharePartition sp2 = mock(SharePartition.class);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 91e3435a2f0..64781648774 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -35,12 +35,18 @@ import 
org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.EndTransactionMarker;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.MockTime;
@@ -58,6 +64,7 @@ import 
org.apache.kafka.server.share.persister.PersisterStateBatch;
 import org.apache.kafka.server.share.persister.ReadShareGroupStateResult;
 import org.apache.kafka.server.share.persister.TopicData;
 import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
+import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.SystemTimer;
@@ -74,6 +81,7 @@ import org.mockito.Mockito;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -114,6 +122,7 @@ public class SharePartitionTest {
     private static final int DEFAULT_FETCH_OFFSET = 0;
     private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
     private static final byte ACKNOWLEDGE_TYPE_GAP_ID = 0;
+    private static final FetchIsolation FETCH_ISOLATION_HWM = 
FetchIsolation.HIGH_WATERMARK;
     private static Timer mockTimer;
     private SharePartitionMetrics sharePartitionMetrics;
 
@@ -1152,7 +1161,8 @@ public class SharePartitionTest {
             BATCH_SIZE,
             10,
             DEFAULT_FETCH_OFFSET,
-            fetchPartitionData(records)),
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
             5);
 
         assertArrayEquals(expectedAcquiredRecord(0, 4, 1).toArray(), 
acquiredRecordsList.toArray());
@@ -1173,7 +1183,8 @@ public class SharePartitionTest {
             BATCH_SIZE,
             10,
             DEFAULT_FETCH_OFFSET,
-            fetchPartitionData(records)),
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
             20);
 
         assertArrayEquals(expectedAcquiredRecord(5, 24, 1).toArray(), 
acquiredRecordsList.toArray());
@@ -1210,7 +1221,8 @@ public class SharePartitionTest {
             BATCH_SIZE,
             10,
             DEFAULT_FETCH_OFFSET,
-            fetchPartitionData(records, 10)),
+            fetchPartitionData(records, 10),
+            FETCH_ISOLATION_HWM),
             20);
 
         // Validate 2 batches are fetched one with 5 records and other till 
end of batch, third batch
@@ -1287,7 +1299,8 @@ public class SharePartitionTest {
                 BATCH_SIZE,
                 MAX_FETCH_RECORDS,
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(MemoryRecords.EMPTY)),
+                fetchPartitionData(MemoryRecords.EMPTY),
+                FETCH_ISOLATION_HWM),
             0
         );
 
@@ -1306,7 +1319,8 @@ public class SharePartitionTest {
             2 /* Batch size */,
             10,
             DEFAULT_FETCH_OFFSET,
-            fetchPartitionData(records)),
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
             5);
 
         assertArrayEquals(expectedAcquiredRecord(0, 4, 1).toArray(), 
acquiredRecordsList.toArray());
@@ -1337,7 +1351,8 @@ public class SharePartitionTest {
             5 /* Batch size */,
             100,
             DEFAULT_FETCH_OFFSET,
-            fetchPartitionData(records)),
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
             26 /* Gap of 3 records will also be added to first batch */);
 
         // Fetch expected records from 4 batches, but change the first 
expected record to include gap offsets.
@@ -1385,7 +1400,8 @@ public class SharePartitionTest {
                 2 /* Batch size */,
                 10,
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             20);
 
         List<AcquiredRecords> expectedAcquiredRecords = 
expectedAcquiredRecords(records, 1);
@@ -1420,7 +1436,8 @@ public class SharePartitionTest {
                 5 /* Batch size */,
                 100,
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             7 /* Acquisition of records starts post endOffset */);
 
         // Fetch expected single batch, but change the first offset as per 
endOffset.
@@ -1451,7 +1468,8 @@ public class SharePartitionTest {
                 5 /* Batch size */,
                 100,
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             13 /* Acquisition of records starts post endOffset */);
 
         // Fetch expected records from 2 batches, but change the first batch's 
first offset as per endOffset.
@@ -1492,7 +1510,8 @@ public class SharePartitionTest {
                 5 /* Batch size */,
                 100,
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             5 /* Acquisition of records starts post endOffset */);
 
         // First batch should be skipped and fetch should result a single 
batch (second batch), but
@@ -2524,7 +2543,8 @@ public class SharePartitionTest {
                 BATCH_SIZE,
                 6, // maxFetchRecords is less than the number of records 
fetched
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             6);
 
         // Since max fetch records (6) is less than the number of records 
fetched (8), only 6 records will be acquired
@@ -2573,7 +2593,8 @@ public class SharePartitionTest {
                 BATCH_SIZE,
                 8, // maxFetchRecords is less than the number of records 
fetched
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             10);
 
         assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), 
acquiredRecordsList.toArray());
@@ -2621,7 +2642,8 @@ public class SharePartitionTest {
                 BATCH_SIZE,
                 8, // maxFetchRecords is less than the number of records 
fetched
                 DEFAULT_FETCH_OFFSET,
-                fetchPartitionData(records)),
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
             10);
 
         assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(), 
acquiredRecordsList.toArray());
@@ -3742,8 +3764,8 @@ public class SharePartitionTest {
         recordsBuilder.appendWithOffset(20, 0L, 
TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
         MemoryRecords records2 = recordsBuilder.build();
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1));
-        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(records2));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1), FETCH_ISOLATION_HWM);
+        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(records2), FETCH_ISOLATION_HWM);
 
         // Acknowledging over subset of second batch with subset of gap 
offsets.
         sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(10, 18, List.of(
@@ -3812,8 +3834,8 @@ public class SharePartitionTest {
         recordsBuilder.appendWithOffset(20, 0L, 
TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
         MemoryRecords records2 = recordsBuilder.build();
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1));
-        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(records2));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1), FETCH_ISOLATION_HWM);
+        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(records2), FETCH_ISOLATION_HWM);
 
         // Acknowledging over subset of second batch with subset of gap 
offsets.
         sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(10, 18, List.of(
@@ -4745,7 +4767,7 @@ public class SharePartitionTest {
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM);
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5);
@@ -4883,7 +4905,7 @@ public class SharePartitionTest {
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM);
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5);
@@ -5961,7 +5983,7 @@ public class SharePartitionTest {
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(5, 10)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(5, 10)), FETCH_ISOLATION_HWM);
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 15), 5);
 
@@ -5992,7 +6014,7 @@ public class SharePartitionTest {
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM);
 
         CompletableFuture<Void> ackResult = 
sharePartition.acknowledge(MEMBER_ID, List.of(
                 new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)),
@@ -6109,12 +6131,12 @@ public class SharePartitionTest {
         String memberId1 = "memberId-1";
         String memberId2 = "memberId-2";
 
-        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
 
         assertFalse(sharePartition.findNextFetchOffset());
         assertEquals(10, sharePartition.nextFetchOffset());
 
-        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(10, 10)));
+        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM);
 
         assertFalse(sharePartition.findNextFetchOffset());
         assertEquals(20, sharePartition.nextFetchOffset());
@@ -6125,7 +6147,7 @@ public class SharePartitionTest {
         assertTrue(sharePartition.findNextFetchOffset());
         assertEquals(5, sharePartition.nextFetchOffset());
 
-        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
 
         assertTrue(sharePartition.findNextFetchOffset());
         assertEquals(20, sharePartition.nextFetchOffset());
@@ -6142,17 +6164,17 @@ public class SharePartitionTest {
         String memberId1 = MEMBER_ID;
         String memberId2 = "member-2";
 
-        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
         assertEquals(3, sharePartition.nextFetchOffset());
 
         sharePartition.acknowledge(memberId1, List.of(
                 new ShareAcknowledgementBatch(0, 2, List.of((byte) 2))));
         assertEquals(0, sharePartition.nextFetchOffset());
 
-        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3, 
fetchPartitionData(memoryRecords(2, 3)));
+        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3, 
fetchPartitionData(memoryRecords(2, 3)), FETCH_ISOLATION_HWM);
         assertEquals(0, sharePartition.nextFetchOffset());
 
-        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+        sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM);
         assertEquals(5, sharePartition.nextFetchOffset());
 
         sharePartition.acknowledge(memberId2, List.of(
@@ -6193,11 +6215,11 @@ public class SharePartitionTest {
                 new ShareAcknowledgementBatch(17, 20, List.of((byte) 2))));
 
         // Reacquire with another member.
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1), FETCH_ISOLATION_HWM);
         assertEquals(10, sharePartition.nextFetchOffset());
 
         // Reacquire with another member.
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(7, 10)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(7, 10)), FETCH_ISOLATION_HWM);
         assertEquals(17, sharePartition.nextFetchOffset());
 
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
@@ -6669,15 +6691,441 @@ public class SharePartitionTest {
         return errorMessage.toString();
     }
 
+    @Test
+    public void testFilterRecordBatchesFromAcquiredRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        List<AcquiredRecords> acquiredRecords1 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches1 = List.of(
+            memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+            memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+            
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+        List<AcquiredRecords> acquiredRecords2 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+        );
+        List<RecordBatch> recordBatches2 = List.of(
+            memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+            memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+                new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+        );
+
+        // Record batches is empty.
+        assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+        List<AcquiredRecords> acquiredRecords3 = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches3 = List.of(
+            memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+            memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+        );
+
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+        );
+    }
+
+    @Test
+    public void testAcquireWithReadCommittedIsolationLevel() {
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 10).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        memoryRecordsBuilder(buffer, 8, 50).close();
+        memoryRecordsBuilder(buffer, 10, 58).close();
+        memoryRecordsBuilder(buffer, 5, 70).close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+        // We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+        // We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+        
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+            List.of(
+                memoryRecordsBuilder(5, 
10).build().batches().iterator().next(),
+                memoryRecordsBuilder(10, 
58).build().batches().iterator().next(),
+                memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+            )
+        );
+
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                10 /* Batch size */,
+                100,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData,
+                FetchIsolation.TXN_COMMITTED),
+            45 /* Gap of 15 records will be added to second batch, gap of 2 
records will also be added to fourth batch */);
+
+        assertEquals(List.of(
+            new 
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short) 
1)
+        ), acquiredRecordsList);
+        assertEquals(75, sharePartition.nextFetchOffset());
+
+        // Checking cached state.
+        assertEquals(4, sharePartition.cachedState().size());
+        assertTrue(sharePartition.cachedState().containsKey(10L));
+        assertTrue(sharePartition.cachedState().containsKey(20L));
+        assertTrue(sharePartition.cachedState().containsKey(50L));
+        assertTrue(sharePartition.cachedState().containsKey(70L));
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNotNull(sharePartition.cachedState().get(50L).offsetState());
+
+        assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset());
+        assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(70L).batchState());
+
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(20L).batchMemberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(70L).batchMemberId());
+
+        
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
+
+        expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(50L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(51L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(52L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(53L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(54L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(55L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(56L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(57L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(58L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(59L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(60L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(61L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(62L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(63L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(64L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(65L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(66L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(67L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(68L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(69L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(50L).offsetState());
+
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask());
+    }
+
+    @Test
+    public void testContainsAbortMarker() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Record batch is not a control batch.
+        RecordBatch recordBatch = mock(RecordBatch.class);
+        when(recordBatch.isControlBatch()).thenReturn(false);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch but doesn't contain any records.
+        recordBatch = mock(RecordBatch.class);
+        Iterator batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(false);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch which contains a record of type 
ControlRecordType.ABORT.
+        recordBatch = mock(RecordBatch.class);
+        batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(true);
+        DefaultRecord record = mock(DefaultRecord.class);
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Buffer has to be created in a way that 
ControlRecordType.parse(buffer) returns ControlRecordType.ABORT.
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.ABORT.type());
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        when(record.key()).thenReturn(buffer);
+        when(batchIterator.next()).thenReturn(record);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertTrue(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch which contains a record of type 
ControlRecordType.COMMIT.
+        recordBatch = mock(RecordBatch.class);
+        batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(true);
+        record = mock(DefaultRecord.class);
+        buffer = ByteBuffer.allocate(4096);
+        // Buffer has to be created in a way that 
ControlRecordType.parse(buffer) returns ControlRecordType.COMMIT.
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.COMMIT.type());
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        when(record.key()).thenReturn(buffer);
+        when(batchIterator.next()).thenReturn(record);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+    }
+
+    @Test
+    public void 
testFetchAbortedTransactionRecordBatchesForOnlyAbortedTransactions() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Case 1 - Creating 10 transactional records in a single batch 
followed by a ABORT marker record for producerId 1.
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 1, 0);
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+        );
+        // records from 0 to 9 should be archived because they are a part of 
aborted transactions.
+        List<RecordBatch> actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(1, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(9, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+
+        // Case 2: 3 individual batches each followed by a ABORT marker record 
for producerId 1.
+        buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 0);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 2);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 4);
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(2).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(4).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(3, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(0, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+        assertEquals(2, actual.get(1).baseOffset());
+        assertEquals(2, actual.get(1).lastOffset());
+        assertEquals(1, actual.get(1).producerId());
+        assertEquals(4, actual.get(2).baseOffset());
+        assertEquals(4, actual.get(2).lastOffset());
+        assertEquals(1, actual.get(2).producerId());
+
+        // Case 3: The producer id of records is different, so they should not 
be archived,
+        buffer = ByteBuffer.allocate(1024);
+        // We are creating 10 transactional records followed by a ABORT marker 
record for producerId 2.
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 2, 0);
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void 
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransactions() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 0);
+        newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 2, 3);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 2, 6);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 9);
+        newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 1, 12);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 15);
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        // Case 1 - Aborted transactions does not contain the record batch 
from offsets 6-7 with producer id 2.
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(9).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(15).setProducerId(1)
+        );
+
+        List<RecordBatch> actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(3, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(1, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+        assertEquals(9, actual.get(1).baseOffset());
+        assertEquals(10, actual.get(1).lastOffset());
+        assertEquals(1, actual.get(1).producerId());
+        assertEquals(15, actual.get(2).baseOffset());
+        assertEquals(16, actual.get(2).lastOffset());
+        assertEquals(1, actual.get(2).producerId());
+
+        // Case 2 - Aborted transactions contains the record batch from 
offsets 6-7 with producer id 2.
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(2),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(9).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(15).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(4, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(1, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+        assertEquals(6, actual.get(1).baseOffset());
+        assertEquals(7, actual.get(1).lastOffset());
+        assertEquals(2, actual.get(1).producerId());
+        assertEquals(9, actual.get(2).baseOffset());
+        assertEquals(10, actual.get(2).lastOffset());
+        assertEquals(1, actual.get(2).producerId());
+        assertEquals(15, actual.get(3).baseOffset());
+        assertEquals(16, actual.get(3).lastOffset());
+        assertEquals(1, actual.get(3).producerId());
+    }
+
+    /**
+     * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
+     */
+    private void newTransactionalRecords(ByteBuffer buffer, ControlRecordType 
controlRecordType, int numRecords, long producerId, long baseOffset) {
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+            RecordBatch.CURRENT_MAGIC_VALUE,
+            Compression.NONE,
+            TimestampType.CREATE_TIME,
+            baseOffset,
+            MOCK_TIME.milliseconds(),
+            producerId,
+            (short) 0,
+            0,
+            true,
+            RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+            for (int i = 0; i < numRecords; i++)
+                builder.append(new SimpleRecord(MOCK_TIME.milliseconds(), 
"key".getBytes(), "value".getBytes()));
+
+            builder.build();
+        }
+        writeTransactionMarker(buffer, controlRecordType, (int) baseOffset + 
numRecords, producerId);
+    }
+
+    private void writeTransactionMarker(ByteBuffer buffer, ControlRecordType 
controlRecordType, int offset, long producerId) {
+        MemoryRecords.writeEndTransactionalMarker(buffer,
+            offset,
+            MOCK_TIME.milliseconds(),
+            0,
+            producerId,
+            (short) 0,
+            new EndTransactionMarker(controlRecordType, 0));
+    }
+
+    private List<FetchResponseData.AbortedTransaction> 
newAbortedTransactions() {
+        FetchResponseData.AbortedTransaction abortedTransaction = new 
FetchResponseData.AbortedTransaction();
+        abortedTransaction.setFirstOffset(0);
+        abortedTransaction.setProducerId(1000L);
+        return List.of(abortedTransaction);
+    }
+
     private FetchPartitionData fetchPartitionData(Records records) {
         return fetchPartitionData(records, 0);
     }
 
+    private FetchPartitionData fetchPartitionData(Records records, 
List<FetchResponseData.AbortedTransaction> abortedTransactions) {
+        return fetchPartitionData(records, 0, abortedTransactions);
+    }
+
     private FetchPartitionData fetchPartitionData(Records records, long 
logStartOffset) {
         return new FetchPartitionData(Errors.NONE, 5, logStartOffset, records,
             Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false);
     }
 
+    private FetchPartitionData fetchPartitionData(Records records, long 
logStartOffset, List<FetchResponseData.AbortedTransaction> abortedTransactions) 
{
+        return new FetchPartitionData(Errors.NONE, 5, logStartOffset, records,
+            Optional.empty(), OptionalLong.empty(), 
Optional.of(abortedTransactions), OptionalInt.empty(), false);
+    }
+
     private List<AcquiredRecords> fetchAcquiredRecords(SharePartition 
sharePartition, Records records, long logStartOffset, int expectedOffsetCount) {
         return fetchAcquiredRecords(sharePartition, records, 
records.batches().iterator().next().baseOffset(), logStartOffset, 
expectedOffsetCount);
     }
@@ -6688,7 +7136,8 @@ public class SharePartitionTest {
             BATCH_SIZE,
             MAX_FETCH_RECORDS,
             fetchOffset,
-            fetchPartitionData(records, logStartOffset));
+            fetchPartitionData(records, logStartOffset),
+            FETCH_ISOLATION_HWM);
         return fetchAcquiredRecords(shareAcquiredRecords, expectedOffsetCount);
     }
 
@@ -6698,7 +7147,8 @@ public class SharePartitionTest {
             BATCH_SIZE,
             MAX_FETCH_RECORDS,
             records.batches().iterator().next().baseOffset(),
-            fetchPartitionData(records));
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM);
         return fetchAcquiredRecords(shareAcquiredRecords, expectedOffsetCount);
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e933b1985c7..b4768a90317 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -75,7 +75,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -340,6 +340,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
+    cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG, 
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
     cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 6dd859e1354..c7569fdf47a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -17,12 +17,14 @@
 
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -32,6 +34,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * Group configuration related parameters and supporting methods like 
validation, etc. are
@@ -59,6 +62,13 @@ public final class GroupConfig extends AbstractConfig {
         "Negative duration is not allowed.</li>" +
         "<li>anything else: throw exception to the share consumer.</li></ul>";
 
+    public static final String SHARE_ISOLATION_LEVEL_CONFIG = 
"share.isolation.level";
+    public static final String SHARE_ISOLATION_LEVEL_DEFAULT = 
IsolationLevel.READ_UNCOMMITTED.toString();
+    public static final String SHARE_ISOLATION_LEVEL_DOC = "Controls how to 
read records written transactionally. " +
+        "If set to \"read_committed\", the share group will only deliver 
transactional records which have been committed. " +
+        "If set to \"read_uncommitted\", the share group will return all 
messages, even transactional messages which have been aborted. " +
+        "Non-transactional records will be returned unconditionally in either 
mode.";
+
     public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = 
"streams.session.timeout.ms";
 
     public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = 
"streams.heartbeat.interval.ms";
@@ -83,6 +93,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public final int streamsNumStandbyReplicas;
 
+    public final String shareIsolationLevel;
+
     private static final ConfigDef CONFIG = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             INT,
@@ -120,6 +132,12 @@ public final class GroupConfig extends AbstractConfig {
             new ShareGroupAutoOffsetResetStrategy.Validator(),
             MEDIUM,
             SHARE_AUTO_OFFSET_RESET_DOC)
+        .define(SHARE_ISOLATION_LEVEL_CONFIG,
+            STRING,
+            SHARE_ISOLATION_LEVEL_DEFAULT,
+            in(IsolationLevel.READ_COMMITTED.toString(), 
IsolationLevel.READ_UNCOMMITTED.toString()),
+            MEDIUM,
+            SHARE_ISOLATION_LEVEL_DOC)
         .define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -150,6 +168,7 @@ public final class GroupConfig extends AbstractConfig {
         this.streamsSessionTimeoutMs = 
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+        this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
     }
 
     public static ConfigDef configDef() {
@@ -290,6 +309,13 @@ public final class GroupConfig extends AbstractConfig {
         return 
ShareGroupAutoOffsetResetStrategy.fromString(SHARE_AUTO_OFFSET_RESET_DEFAULT);
     }
 
+    /**
+     * The default share group isolation level.
+     */
+    public static IsolationLevel defaultShareIsolationLevel() {
+        return 
IsolationLevel.valueOf(SHARE_ISOLATION_LEVEL_DEFAULT.toUpperCase(Locale.ROOT));
+    }
+
     /**
      * The consumer group session timeout in milliseconds.
      */
@@ -352,4 +378,18 @@ public final class GroupConfig extends AbstractConfig {
     public int streamsNumStandbyReplicas() {
         return streamsNumStandbyReplicas;
     }
+
+    /**
+     * The share group isolation level.
+     */
+    public IsolationLevel shareIsolationLevel() {
+        if (shareIsolationLevel == null) {
+            throw new IllegalArgumentException("Share isolation level is 
null");
+        }
+        try {
+            return 
IsolationLevel.valueOf(shareIsolationLevel.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Unknown Share isolation level: 
" + shareIsolationLevel);
+        }
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 4e77eb15125..50205b0a9e9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -61,6 +61,8 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "hello", "1.0");
+            } else if (GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "hello", "1.0");
             } else if 
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -100,6 +102,19 @@ public class GroupConfigTest {
         doTestValidProps(props);
     }
 
+    @Test
+    public void testValidShareIsolationLevelValues() {
+        // Check for value READ_UNCOMMITTED
+        Properties props = createValidGroupConfig();
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed");
+        doTestValidProps(props);
+
+        // Check for value READ_COMMITTED
+        props = createValidGroupConfig();
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        doTestValidProps(props);
+    }
+
     @Test
     public void testInvalidProps() {
 
@@ -190,6 +205,16 @@ public class GroupConfigTest {
         // Check for invalid streamsHeartbeatIntervalMs, > MAX
         props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
         doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareIsolationLevel.
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
+        doTestInvalidProps(props, ConfigException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareIsolationLevel.
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_uncommit");
+        doTestInvalidProps(props, ConfigException.class);
     }
 
     private void doTestInvalidProps(Properties props, Class<? extends 
Exception> exceptionClassName) {
@@ -209,6 +234,7 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+        defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
         defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
         defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
@@ -223,6 +249,7 @@ public class GroupConfigTest {
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
         assertEquals("latest", 
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
+        assertEquals("read_uncommitted", 
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
@@ -244,6 +271,7 @@ public class GroupConfigTest {
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
         props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
         props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
         props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
         props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");

Reply via email to