dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1422418778


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -595,22 +609,25 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
         /**
          * Constructor.
          *
-         * @param name  The operation name.
-         * @param tp    The topic partition that the operation is applied to.
-         * @param op    The write operation.
+         * @param name      The operation name.
+         * @param tp        The topic partition that the operation is applied 
to.
+         * @param op        The write operation.
+         * @param timeout   The write operation timeout
          */
         CoordinatorWriteEvent(
             String name,
             TopicPartition tp,
-            CoordinatorWriteOperation<S, T, U> op
+            CoordinatorWriteOperation<S, T, U> op,

Review Comment:
   It would be better to keep the `op` as the last argument.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1259,7 +1298,8 @@ public <T> CompletableFuture<T> 
scheduleTransactionalWriteOperation(
         String transactionalId,
         long producerId,
         short producerEpoch,
-        CoordinatorWriteOperation<S, T, U> op
+        CoordinatorWriteOperation<S, T, U> op,

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -698,6 +718,14 @@ public void run() {
                                 producerEpoch,
                                 result.records()
                             );
+
+                            timer.add(new TimerTask(timeout) {
+                                @Override
+                                public void run() {
+                                    scheduleInternalOperation(name, tp, () -> 
complete(new TimeoutException("Writing records to the log timed out")));

Review Comment:
   I wonder if we could more details in the exception (e.g. event name, etc.).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -623,14 +640,16 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
          * @param producerId        The producer id.
          * @param producerEpoch     The producer epoch.
          * @param op                The write operation.
+         * @param timeout           The write operation timeout
          */
         CoordinatorWriteEvent(
             String name,
             TopicPartition tp,
             String transactionalId,
             long producerId,
             short producerEpoch,
-            CoordinatorWriteOperation<S, T, U> op
+            CoordinatorWriteOperation<S, T, U> op,

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -581,6 +589,12 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
          */
         final CompletableFuture<T> future;
 
+        /**
+         * Timeout value for the write operation
+         */
+
+        final int timeout;

Review Comment:
   I wonder if we should rather use a Duration. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1229,11 +1266,12 @@ private void withActiveContextOrThrow(
     public <T> CompletableFuture<T> scheduleWriteOperation(
         String name,
         TopicPartition tp,
-        CoordinatorWriteOperation<S, T, U> op
+        CoordinatorWriteOperation<S, T, U> op,

Review Comment:
   nit: Let's keep `op` as the last param.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -92,6 +93,7 @@ public static class Builder<S extends CoordinatorShard<U>, U> 
{
         private CoordinatorShardBuilderSupplier<S, U> 
coordinatorShardBuilderSupplier;
         private Time time = Time.SYSTEM;
         private Timer timer;
+        private int timeout;

Review Comment:
   nit: should we call it `defaultWriteTimeout`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -950,6 +948,55 @@ public void testScheduleWriteOpWhenWriteFails() {
         assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
     }
 
+    @Test
+    public void testScheduleWriteOpWhenWriteTimesout() throws 
InterruptedException {
+        MockTimer timer = new MockTimer();
+        MockTime time = new MockTime();
+        // The partition writer only accept on write.
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.lastWrittenOffset);
+        assertEquals(0, ctx.lastCommittedOffset);
+        assertEquals(Collections.singletonList(0L), 
ctx.snapshotRegistry.epochsList());
+
+        // The partition writer times out when trying to append records.
+        doAnswer(invocation -> {
+            time.sleep(DEFAULT_WRITE_TIMEOUT * 3);

Review Comment:
   You actually don't need this because the timeout will be reached if the HWM 
is not advanced.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -698,6 +718,14 @@ public void run() {
                                 producerEpoch,
                                 result.records()
                             );
+
+                            timer.add(new TimerTask(timeout) {
+                                @Override
+                                public void run() {
+                                    scheduleInternalOperation(name, tp, () -> 
complete(new TimeoutException("Writing records to the log timed out")));

Review Comment:
   I also wonder whether we could cancel the timer or skip scheduling the 
internal op if when the event is completed. I can think of two ways: 1) We keep 
a reference to TimerTask and call `cancel` on it; 2) We check the `future` 
before calling `scheduleInternalOperation`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to