dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424473429
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -135,6 +138,11 @@ public Builder<S, U> withTimer(Timer timer) { return this; } + public Builder<S, U> withDefaultWriteTimeOut(Duration timeout) { Review Comment: nit: `timeout` -> `defaultWriteTimeout`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -595,13 +609,15 @@ class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tp The topic partition that the operation is applied to. - * @param op The write operation. + * @param name The operation name. + * @param tp The topic partition that the operation is applied to. + * @param defaultWriteTimeout The default write operation timeout + * @param op The write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, + Duration defaultWriteTimeout, Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -581,6 +590,11 @@ class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture<T> future; + /** + * Timeout value for the write operation + */ + final Duration defaultWriteTimeout; Review Comment: nit: This one should actually be `writeTimeout`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -581,6 +590,11 @@ class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture<T> future; + /** + * Timeout value for the write operation Review Comment: nit: `.`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } + @Test + public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { + MockTimer timer = new MockTimer(); + // The partition writer only accept on write. + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Loads the coordinator. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Write #1. We should get a TimeoutException because the HWM will not advance. + CompletableFuture<String> timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + + timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() * 2); Review Comment: nit: I would just do `timeout + 1` if possible. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -610,26 +626,29 @@ class CoordinatorWriteEvent<T> implements CoordinatorEvent, DeferredEvent { null, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + defaultWriteTimeout, op ); } /** * Constructor. * - * @param name The operation name. - * @param tp The topic partition that the operation is applied to. - * @param transactionalId The transactional id. - * @param producerId The producer id. - * @param producerEpoch The producer epoch. - * @param op The write operation. + * @param name The operation name. + * @param tp The topic partition that the operation is applied to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param defaultWriteTimeout The write operation timeout + * @param op The write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, String transactionalId, long producerId, short producerEpoch, + Duration defaultWriteTimeout, Review Comment: ditto. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -703,6 +723,18 @@ public void run() { // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + timer.add(new TimerTask(defaultWriteTimeout.toMillis()) { + @Override + public void run() { + if (!future.isDone()) { + scheduleInternalOperation( + "LogAppendEvent(name=" + name + ", tp=" + tp + ")", + tp, + () -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) Review Comment: Another thing, we should use `org.apache.kafka.common.errors.TimeoutException`. Could we also ensure this in the test `testScheduleWriteOpWhenWriteTimesOut`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -703,6 +723,18 @@ public void run() { // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + timer.add(new TimerTask(defaultWriteTimeout.toMillis()) { + @Override + public void run() { + if (!future.isDone()) { + scheduleInternalOperation( + "LogAppendEvent(name=" + name + ", tp=" + tp + ")", Review Comment: nit: `WriteTimeout`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -703,6 +723,18 @@ public void run() { // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + timer.add(new TimerTask(defaultWriteTimeout.toMillis()) { + @Override + public void run() { + if (!future.isDone()) { + scheduleInternalOperation( + "LogAppendEvent(name=" + name + ", tp=" + tp + ")", + tp, + () -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) Review Comment: nit: `"CoordinatorWriteEvent " + name + " timed out after " + timeoutMs + "ms"`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } + @Test + public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { + MockTimer timer = new MockTimer(); + // The partition writer only accept on write. + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Loads the coordinator. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0, ctx.coordinator.lastWrittenOffset()); + assertEquals(0, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Write #1. We should get a TimeoutException because the HWM will not advance. + CompletableFuture<String> timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, Review Comment: nit: Could we actually use a different timeout here to ensure that the default is not used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org