junrao commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031937222
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception
{
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 8);
+ // 5th and 10th message transaction was aborted, hence they won't
be included in the fetched records.
+ assertEquals(8, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ if (messageCounter % 5 == 0)
+ messageCounter++;
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testReadUncommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ // Even though 5th and 10th message transaction was aborted, they
will be included in the fetched records since IsolationLevel is
READ_UNCOMMITTED.
+ assertEquals(10, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 2", new String(record.value()));
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ records = waitedPoll(shareConsumer, 2500L, 2);
+ // Message 3 and Message 4 would be returned by this poll.
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> recordIterator =
records.iterator();
+ record = recordIterator.next();
+ assertEquals("Message 3", new String(record.value()));
+ record = recordIterator.next();
+ assertEquals("Message 4", new String(record.value()));
+ // We will make Message 3 and Message 4 available for
re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // We are altering IsolationLevel to READ_COMMITTED now. We
will only read committed transactions now.
+ alterShareIsolationLevel("group1", "read_committed");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_COMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5 and Message 8.
+ // We cannot consume Message 4 (aborted transaction that was
released), Message 6 and Message 7 since they were aborted.
+ List<String> messages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ messages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return messages.size() == 3;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 3", messages.get(0));
+ assertEquals("Message 5", messages.get(1));
+ assertEquals("Message 8", messages.get(2));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ // We will not receive any records since the transaction was
aborted.
+ records = waitedPoll(shareConsumer, 2500L, 0);
Review Comment:
Does this force the test to wait for 2.5 secs since it reads 0 record?
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception
{
verifyShareGroupStateTopicRecordsProduced();
}
+ @ClusterTest
+ public void testReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 8);
+ // 5th and 10th message transaction was aborted, hence they won't
be included in the fetched records.
+ assertEquals(8, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ if (messageCounter % 5 == 0)
+ messageCounter++;
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testReadUncommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ // Even though 5th and 10th message transaction was aborted, they
will be included in the fetched records since IsolationLevel is
READ_UNCOMMITTED.
+ assertEquals(10, records.count());
+ int messageCounter = 1;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ assertEquals("Message " + messageCounter, new
String(record.value()));
+ messageCounter++;
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_uncommitted");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 2", new String(record.value()));
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ records = waitedPoll(shareConsumer, 2500L, 2);
+ // Message 3 and Message 4 would be returned by this poll.
+ assertEquals(2, records.count());
+ Iterator<ConsumerRecord<byte[], byte[]>> recordIterator =
records.iterator();
+ record = recordIterator.next();
+ assertEquals("Message 3", new String(record.value()));
+ record = recordIterator.next();
+ assertEquals("Message 4", new String(record.value()));
+ // We will make Message 3 and Message 4 available for
re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // We are altering IsolationLevel to READ_COMMITTED now. We
will only read committed transactions now.
+ alterShareIsolationLevel("group1", "read_committed");
+
+ // Fifth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
5");
+ // Sixth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 6");
+ // Seventh transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 7");
+ // Eighth transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
8");
+
+ // Since isolation level is READ_COMMITTED, we can consume
Message 3 (committed transaction that was released), Message 5 and Message 8.
+ // We cannot consume Message 4 (aborted transaction that was
released), Message 6 and Message 7 since they were aborted.
+ List<String> messages = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> pollRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ if (pollRecords.count() > 0) {
+ for (ConsumerRecord<byte[], byte[]> pollRecord :
pollRecords)
+ messages.add(new String(pollRecord.value()));
+ pollRecords.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ }
+ return messages.size() == 3;
+ }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all
records post altering share isolation level");
+
+ assertEquals("Message 3", messages.get(0));
+ assertEquals("Message 5", messages.get(1));
+ assertEquals("Message 8", messages.get(2));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ transactionalProducer.close();
+ }
+ }
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+
+ @ClusterTest
+ public void
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ alterShareIsolationLevel("group1", "read_committed");
+ try (Producer<byte[], byte[]> transactionalProducer =
createProducer("T1");
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ transactionalProducer.initTransactions();
+
+ try {
+ // First transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
1");
+
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> record =
records.iterator().next();
+ assertEquals("Message 1", new String(record.value()));
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+
+ // Second transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 2");
+
+ // We will not receive any records since the transaction was
aborted.
+ records = waitedPoll(shareConsumer, 2500L, 0);
+ assertEquals(0, records.count());
+
+ // Third transaction is committed.
+ produceCommittedTransaction(transactionalProducer, "Message
3");
+ // Fourth transaction is aborted.
+ produceAbortedTransaction(transactionalProducer, "Message 4");
+
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Message 3 would be returned by this poll.
+ assertEquals(1, records.count());
+ record = records.iterator().next();
+ assertEquals("Message 3", new String(record.value()));
+ // We will make Message 3 available for re-consumption.
+ records.forEach(consumedRecord ->
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+
+ // Precautionary poll so that the aborted transaction for
Message 4 is fetched by the consumer.
+ shareConsumer.poll(Duration.ofMillis(5000));
Review Comment:
> Since this is a poll to make sure aborted transaction for message 4 is
fetched by the consumer, it won't return m any records.
Since the `poll()` call doesn't return any record, we are forced to wait for
2.5 secs, right? Also, how do we know for sure that message 4 is fetched by the
consumer?
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6669,15 +6691,441 @@ private String assertionFailedMessage(SharePartition
sharePartition, Map<Long, L
return errorMessage.toString();
}
+ @Test
+ public void testFilterRecordBatchesFromAcquiredRecords() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ List<AcquiredRecords> acquiredRecords1 = List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short)
1)
+ );
+ List<RecordBatch> recordBatches1 = List.of(
+ memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+ memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+ );
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short)
1)),
+
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1,
recordBatches1));
+
+ List<AcquiredRecords> acquiredRecords2 = List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short)
3)
+ );
+ List<RecordBatch> recordBatches2 = List.of(
+ memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+ memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+ );
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short)
3)
+
+ ),
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2,
recordBatches2)
+ );
+
+ // Record batches is empty.
+ assertEquals(acquiredRecords2,
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2,
List.of()));
+
+ List<AcquiredRecords> acquiredRecords3 = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short)
1)
+ );
+ List<RecordBatch> recordBatches3 = List.of(
+ memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+ memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+ );
+
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short)
1)
+
+ ),
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3,
recordBatches3)
+ );
+ }
+
+ @Test
+ public void testAcquireWithReadCommittedIsolationLevel() {
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 5, 10).close();
+ memoryRecordsBuilder(buffer, 5, 15).close();
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ memoryRecordsBuilder(buffer, 8, 50).close();
+ memoryRecordsBuilder(buffer, 10, 58).close();
+ memoryRecordsBuilder(buffer, 5, 70).close();
+
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ FetchPartitionData fetchPartitionData = fetchPartitionData(records,
newAbortedTransactions());
+
+ // We are mocking the result of function
fetchAbortedTransactionRecordBatches. The records present at these offsets need
to be archived.
+ // We won't be utilizing the aborted transactions passed in
fetchPartitionData.
+
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
fetchPartitionData.abortedTransactions.get())).thenReturn(
+ List.of(
+ memoryRecordsBuilder(5,
10).build().batches().iterator().next(),
+ memoryRecordsBuilder(10,
58).build().batches().iterator().next(),
+ memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+ )
+ );
+
+ List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+ sharePartition.acquire(
+ MEMBER_ID,
+ 10 /* Batch size */,
+ 100,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData,
+ FetchIsolation.TXN_COMMITTED),
+ 45 /* Gap of 15 records will be added to second batch, gap of 2
records will also be added to fourth batch */);
+
+ assertEquals(List.of(
+ new
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short)
1)
+ ), acquiredRecordsList);
+ assertEquals(75, sharePartition.nextFetchOffset());
+
+ // Checking cached state.
+ assertEquals(4, sharePartition.cachedState().size());
+ assertTrue(sharePartition.cachedState().containsKey(10L));
+ assertTrue(sharePartition.cachedState().containsKey(20L));
+ assertTrue(sharePartition.cachedState().containsKey(50L));
+ assertTrue(sharePartition.cachedState().containsKey(70L));
+ assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+ assertNotNull(sharePartition.cachedState().get(50L).offsetState());
+
+ assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset());
+ assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset());
+ assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset());
+ assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(70L).batchState());
+
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(20L).batchMemberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(70L).batchMemberId());
+
+
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask());
+
+ Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(15L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(16L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(17L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(18L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(19L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
+
+ expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(50L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(51L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(52L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(53L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(54L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(55L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(56L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(57L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(58L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(59L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(60L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(61L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(62L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(63L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(64L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(65L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(66L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(67L, new
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(68L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(69L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(50L).offsetState());
+
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask());
+
assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask());
+
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask());
+ }
+
+ @Test
+ public void testContainsAbortMarker() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // Record batch is not a control batch.
+ RecordBatch recordBatch = mock(RecordBatch.class);
+ when(recordBatch.isControlBatch()).thenReturn(false);
+ assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+ // Record batch is a control batch but doesn't contain any records.
+ recordBatch = mock(RecordBatch.class);
+ Iterator batchIterator = mock(Iterator.class);
+ when(batchIterator.hasNext()).thenReturn(false);
+ when(recordBatch.iterator()).thenReturn(batchIterator);
+ when(recordBatch.isControlBatch()).thenReturn(true);
+ assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+ // Record batch is a control batch which contains a record of type
ControlRecordType.ABORT.
+ recordBatch = mock(RecordBatch.class);
+ batchIterator = mock(Iterator.class);
+ when(batchIterator.hasNext()).thenReturn(true);
+ DefaultRecord record = mock(DefaultRecord.class);
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ // Buffer has to be created in a way that
ControlRecordType.parse(buffer) returns ControlRecordType.ABORT.
+ buffer.putShort((short) 5);
+ buffer.putShort(ControlRecordType.ABORT.type());
+ buffer.putInt(23432); // some field added in version 5
+ buffer.flip();
+ when(record.key()).thenReturn(buffer);
+ when(batchIterator.next()).thenReturn(record);
+ when(recordBatch.iterator()).thenReturn(batchIterator);
+ when(recordBatch.isControlBatch()).thenReturn(true);
+ assertTrue(sharePartition.containsAbortMarker(recordBatch));
+
+ // Record batch is a control batch which contains a record of type
ControlRecordType.COMMIT.
+ recordBatch = mock(RecordBatch.class);
+ batchIterator = mock(Iterator.class);
+ when(batchIterator.hasNext()).thenReturn(true);
+ record = mock(DefaultRecord.class);
+ buffer = ByteBuffer.allocate(4096);
+ // Buffer has to be created in a way that
ControlRecordType.parse(buffer) returns ControlRecordType.COMMIT.
+ buffer.putShort((short) 5);
+ buffer.putShort(ControlRecordType.COMMIT.type());
+ buffer.putInt(23432); // some field added in version 5
+ buffer.flip();
+ when(record.key()).thenReturn(buffer);
+ when(batchIterator.next()).thenReturn(record);
+ when(recordBatch.iterator()).thenReturn(batchIterator);
+ when(recordBatch.isControlBatch()).thenReturn(true);
+ assertFalse(sharePartition.containsAbortMarker(recordBatch));
+ }
+
+ @Test
+ public void
testFetchAbortedTransactionRecordBatchesForOnlyAbortedTransactions() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // Case 1 - Creating 10 transactional records in a single batch
followed by a ABORT marker record for producerId 1.
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 1, 0);
+ buffer.flip();
+ Records records = MemoryRecords.readableRecords(buffer);
+
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+ );
+ // records from 0 to 9 should be archived because they are a part of
aborted transactions.
+ List<RecordBatch> actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(1, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(9, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+
+ // Case 2: 3 individual batches each followed by a ABORT marker record
for producerId 1.
+ buffer = ByteBuffer.allocate(1024);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 0);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 2);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 4);
+ buffer.flip();
+ records = MemoryRecords.readableRecords(buffer);
+ abortedTransactions = List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(2).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(4).setProducerId(1)
+ );
+
+ actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(3, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(0, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+ assertEquals(2, actual.get(1).baseOffset());
+ assertEquals(2, actual.get(1).lastOffset());
+ assertEquals(1, actual.get(1).producerId());
+ assertEquals(4, actual.get(2).baseOffset());
+ assertEquals(4, actual.get(2).lastOffset());
+ assertEquals(1, actual.get(2).producerId());
+
+ // Case 3: The producer id of records is different, so they should not
be archived,
+ buffer = ByteBuffer.allocate(1024);
+ // We are creating 10 transactional records followed by a ABORT marker
record for producerId 2.
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 2, 0);
+ buffer.flip();
+ records = MemoryRecords.readableRecords(buffer);
+ abortedTransactions = List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+ );
+
+ actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(0, actual.size());
+ }
+
+ @Test
+ public void
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransactions() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 0);
+ newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 2, 3);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 2, 6);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 9);
+ newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 1, 12);
+ newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 15);
+ buffer.flip();
+ Records records = MemoryRecords.readableRecords(buffer);
+
+ // Case 1 - Aborted transactions does not contain the record batch
from offsets 6-7 with producer id 2.
+ List<FetchResponseData.AbortedTransaction> abortedTransactions =
List.of(
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(9).setProducerId(1),
+ new
FetchResponseData.AbortedTransaction().setFirstOffset(15).setProducerId(1)
+ );
+
+ List<RecordBatch> actual =
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(),
abortedTransactions);
+ assertEquals(3, actual.size());
+ assertEquals(0, actual.get(0).baseOffset());
+ assertEquals(1, actual.get(0).lastOffset());
+ assertEquals(1, actual.get(0).producerId());
+ assertEquals(9, actual.get(1).baseOffset());
+ assertEquals(10, actual.get(1).lastOffset());
+ assertEquals(1, actual.get(1).producerId());
+ assertEquals(15, actual.get(2).baseOffset());
+ assertEquals(16, actual.get(2).lastOffset());
+ assertEquals(1, actual.get(2).producerId());
+
+ // Case 2 - Aborted transactions contains the record batch from 4-5
with producer id 2.
Review Comment:
Should we fix this comment too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]