dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433888169


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder<MockCoordinatorShard, 
String> get() {
         );
     }
 
+    @ParameterizedTest
+    @EnumSource(value = TransactionResult.class)
+    public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Transactional write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleTransactionalWriteOperation(
+            "write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 5,
+            DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // The last written offset is updated.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        // The last committed offset does not change.
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        // A new snapshot is created.
+        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        // Records have been replayed to the coordinator. They are stored in
+        // the pending set for now.
+        assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+            100L
+        ));
+        // Records have been written to the log.
+        assertEquals(Arrays.asList(
+            InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+            InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+        ), writer.entries(TP));
+
+        // Complete transaction #1.
+        CompletableFuture<Void> complete1 = 
runtime.scheduleTransactionCompletion(
+            "complete#1",
+            TP,
+            100L,
+            (short) 5,
+            10,
+            result,
+            DEFAULT_WRITE_TIMEOUT
+        );
+
+        // Verify that the completion is not committed yet.
+        assertFalse(complete1.isDone());
+
+        // The last written offset is updated.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        // The last committed offset does not change.
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        // A new snapshot is created.
+        assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        // Records have been replayed to the coordinator.
+        if (result == TransactionResult.COMMIT) {
+            // They are now in the records set if committed.
+            assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+        } else {
+            // Or they are gone if aborted.
+            assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+        }
+
+        // Records have been written to the log.
+        assertEquals(Arrays.asList(
+            InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+            InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+            InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+        ), writer.entries(TP));
+
+        // Commit write #1.
+        writer.commit(TP, 2);
+
+        // The write is completed.
+        assertTrue(write1.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+        // Commit completion #1.
+        writer.commit(TP, 3);
+
+        // The transaction is completed.
+        assertTrue(complete1.isDone());
+        assertNull(complete1.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleTransactionCompletionWhenWriteTimesOut() throws 
InterruptedException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Complete #1. We should get a TimeoutException because the HWM will 
not advance.
+        CompletableFuture<Void> timedOutCompletion = 
runtime.scheduleTransactionCompletion(
+            "complete#1",
+            TP,
+            100L,
+            (short) 5,
+            10,
+            TransactionResult.COMMIT,
+            Duration.ofMillis(3)
+        );
+
+        timer.advanceClock(4);
+
+        assertFutureThrows(timedOutCompletion, 
org.apache.kafka.common.errors.TimeoutException.class);
+    }
+
+    @Test
+    public void testScheduleTransactionCompletionWhenWriteFails() {
+        MockTimer timer = new MockTimer();
+        // The partition writer accepts records but fails on markers.
+        MockPartitionWriter writer = new MockPartitionWriter(true);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Write #1. It should succeed and be applied to the coordinator.
+        runtime.scheduleTransactionalWriteOperation(
+            "write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 5,
+            DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+        // Verify that the state has been updated.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+
+        // Complete transaction #1. It should fail.
+        CompletableFuture<Void> complete1 = 
runtime.scheduleTransactionCompletion(

Review Comment:
   Yes, it is retriable depending on the error. The decision is left to the 
caller. As explain in my previous comment, we don't rollback on timeouts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to