Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac merged PR #14981: URL: https://github.com/apache/kafka/pull/14981 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on PR #14981: URL: https://github.com/apache/kafka/pull/14981#issuecomment-1854579152 The failed tests are not related. Merging to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1425242119 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -997,7 +997,7 @@ public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3), state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); -timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1); +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 3); Review Comment: nit: This should be `3 + 1`. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -997,7 +997,7 @@ public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3), state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); -timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1); +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 3); Review Comment: nit: This should be `3 + 1`, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1425241345 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -19,16 +19,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; -import org.apache.kafka.common.errors.InvalidFetchSizeException; -import org.apache.kafka.common.errors.KafkaStorageException; -import org.apache.kafka.common.errors.NotCoordinatorException; -import org.apache.kafka.common.errors.NotEnoughReplicasException; -import org.apache.kafka.common.errors.NotLeaderOrFollowerException; -import org.apache.kafka.common.errors.RecordBatchTooLargeException; -import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.UnknownMemberIdException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.*; Review Comment: nit: Let's revert this one. We usually don't do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
vamossagar12 commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1425209384 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance. +CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3), +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424943646 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance. +CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3), +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1); Review Comment: We should use `3` here to be consistent with `Duration.ofMillis(3)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
vamossagar12 commented on PR #14981: URL: https://github.com/apache/kafka/pull/14981#issuecomment-1853271332 Thanks for the review @dajac . I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424473429 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) { return this; } +public Builder withDefaultWriteTimeOut(Duration timeout) { Review Comment: nit: `timeout` -> `defaultWriteTimeout`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -595,13 +609,15 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param opThe write operation. + * @param name The operation name. + * @param tpThe topic partition that the operation is applied to. + * @param defaultWriteTimeout The default write operation timeout + * @param opThe write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, +Duration defaultWriteTimeout, Review Comment: ditto. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ +final Duration defaultWriteTimeout; Review Comment: nit: This one should actually be `writeTimeout`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation Review Comment: nit: `.`. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance. +CompletableFuture timedOutWrite = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); + +timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() * 2); Review Comment: nit: I would just do `timeout + 1` if possible. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -610,26 +626,29 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { null, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, +defaultWriteTimeout, op ); } /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
vamossagar12 commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424381464 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -719,20 +721,26 @@ public void run() { result.records() ); -timer.add(new TimerTask(timeout) { +timer.add(new TimerTask(timeout.toMillis()) { @Override public void run() { -scheduleInternalOperation(name, tp, () -> complete(new TimeoutException("Writing records to the log timed out"))); +if (!future.isDone()) { +scheduleInternalOperation( +"LogAppendEvent(name=" + name + ", tp=" + tp + ")", +tp, +() -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) +); +} } }); - context.coordinator.updateLastWrittenOffset(offset); - // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + context.coordinator.updateLastWrittenOffset(offset); Review Comment: Ok. yes, that was a mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424354426 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesout() throws InterruptedException { +MockTimer timer = new MockTimer(); +// The partition writer only accept on write. +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.coordinator.lastWrittenOffset()); +assertEquals(0, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Write #1. We should get a TimeoutException because the HWM will not advance Review Comment: nit: `.` at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424353506 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); } +@Test +public void testScheduleWriteOpWhenWriteTimesout() throws InterruptedException { Review Comment: nit: `TimesOut`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424352569 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -186,6 +187,7 @@ public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedE when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("consumer-group-heartbeat"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any(), Review Comment: Could we actually put the expected value for all of those? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351506 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1092,6 +1134,7 @@ private CoordinatorRuntime( CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier, Time time, Timer timer, +Duration timeout, Review Comment: nit `defaultWriteTimeout`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350048 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ + +final Duration timeout; Review Comment: Should it be `defaultWriteTimeout` too? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param opThe write operation. + * @param name The operation name. + * @param tpThe topic partition that the operation is applied to. + * @param timeout The write operation timeout + * @param opThe write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, +Duration timeout, Review Comment: Same question about the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351314 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1019,6 +1055,11 @@ public void onHighWatermarkUpdated( */ private final Timer timer; +/** + * The write operation timeout + */ +private final Duration timeout; Review Comment: nit: `defaultWriteTimeout`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350465 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -698,13 +720,27 @@ public void run() { producerEpoch, result.records() ); - context.coordinator.updateLastWrittenOffset(offset); + +timer.add(new TimerTask(timeout.toMillis()) { +@Override +public void run() { +if (!future.isDone()) { +scheduleInternalOperation( +"LogAppendEvent(name=" + name + ", tp=" + tp + ")", +tp, +() -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) +); +} +} +}); // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + context.coordinator.updateLastWrittenOffset(offset); } else { complete(null); + Review Comment: nit: We could remove this empty line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350233 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { /** * Constructor. * - * @param name The operation name. - * @param tpThe topic partition that the operation is applied to. - * @param opThe write operation. + * @param name The operation name. + * @param tpThe topic partition that the operation is applied to. + * @param timeout The write operation timeout + * @param opThe write operation. */ CoordinatorWriteEvent( String name, TopicPartition tp, +Duration timeout, Review Comment: Same question about the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349399 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) { return this; } +public Builder withWriteTimeOut(Duration timeout) { Review Comment: nit: `withDefaultWriteTimeout`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349887 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ + Review Comment: nit: Remove empty line. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final CompletableFuture future; +/** + * Timeout value for the write operation + */ + +final Duration timeout; Review Comment: Should it be `defaultWriteTimeout` too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on code in PR #14981: URL: https://github.com/apache/kafka/pull/14981#discussion_r1424345912 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -719,20 +721,26 @@ public void run() { result.records() ); -timer.add(new TimerTask(timeout) { +timer.add(new TimerTask(timeout.toMillis()) { @Override public void run() { -scheduleInternalOperation(name, tp, () -> complete(new TimeoutException("Writing records to the log timed out"))); +if (!future.isDone()) { +scheduleInternalOperation( +"LogAppendEvent(name=" + name + ", tp=" + tp + ")", +tp, +() -> complete(new TimeoutException("Log append event " + name + "timed out for TopicPartition " + tp)) +); +} } }); - context.coordinator.updateLastWrittenOffset(offset); - // Add the response to the deferred queue. if (!future.isDone()) { context.deferredEventQueue.add(offset, this); + context.coordinator.updateLastWrittenOffset(offset); Review Comment: Why did you move this? I think that it should stay where it was and the timer must be added here, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]
dajac commented on PR #14981: URL: https://github.com/apache/kafka/pull/14981#issuecomment-1850048978 @vamossagar12 I updated the title to match the one of the Jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org