Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-13 Thread via GitHub


dajac merged PR #14981:
URL: https://github.com/apache/kafka/pull/14981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-13 Thread via GitHub


dajac commented on PR #14981:
URL: https://github.com/apache/kafka/pull/14981#issuecomment-1854579152

   The failed tests are not related. Merging to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-13 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1425242119


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -997,7 +997,7 @@ public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
 CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
 state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
 
-timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1);
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 3);

Review Comment:
   nit: This should be `3 + 1`.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -997,7 +997,7 @@ public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
 CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
 state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
 
-timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1);
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 3);

Review Comment:
   nit: This should be `3 + 1`, isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-13 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1425241345


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -19,16 +19,7 @@
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
-import org.apache.kafka.common.errors.InvalidFetchSizeException;
-import org.apache.kafka.common.errors.KafkaStorageException;
-import org.apache.kafka.common.errors.NotCoordinatorException;
-import org.apache.kafka.common.errors.NotEnoughReplicasException;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
-import org.apache.kafka.common.errors.RecordBatchTooLargeException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.UnknownMemberIdException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.*;

Review Comment:
   nit: Let's revert this one. We usually don't do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-13 Thread via GitHub


vamossagar12 commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1425209384


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance.
+CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424943646


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance.
+CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() + 1);

Review Comment:
   We should use `3` here to be consistent with `Duration.ofMillis(3)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on PR #14981:
URL: https://github.com/apache/kafka/pull/14981#issuecomment-1853271332

   Thanks for the review @dajac . I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424473429


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) {
 return this;
 }
 
+public Builder withDefaultWriteTimeOut(Duration timeout) {

Review Comment:
   nit: `timeout` -> `defaultWriteTimeout`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -595,13 +609,15 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is applied to.
- * @param opThe write operation.
+ * @param name  The operation name.
+ * @param tpThe topic partition that the operation 
is applied to.
+ * @param defaultWriteTimeout   The default write operation timeout
+ * @param opThe write operation.
  */
 CoordinatorWriteEvent(
 String name,
 TopicPartition tp,
+Duration defaultWriteTimeout,

Review Comment:
   ditto.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+final Duration defaultWriteTimeout;

Review Comment:
   nit: This one should actually be `writeTimeout`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +590,11 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation

Review Comment:
   nit: `.`.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance.
+CompletableFuture timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() * 2);

Review Comment:
   nit: I would just do `timeout + 1` if possible.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -610,26 +626,29 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 null,
 RecordBatch.NO_PRODUCER_ID,
 RecordBatch.NO_PRODUCER_EPOCH,
+defaultWriteTimeout,
 op
 );
 }
 
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is 
applied to.
- * @param 

Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


vamossagar12 commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424381464


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -719,20 +721,26 @@ public void run() {
 result.records()
 );
 
-timer.add(new TimerTask(timeout) {
+timer.add(new TimerTask(timeout.toMillis()) {
 @Override
 public void run() {
-scheduleInternalOperation(name, tp, () -> 
complete(new TimeoutException("Writing records to the log timed out")));
+if (!future.isDone()) {
+scheduleInternalOperation(
+"LogAppendEvent(name=" + name + ", 
tp=" + tp + ")",
+tp,
+() -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))
+);
+}
 }
 });
 
-
context.coordinator.updateLastWrittenOffset(offset);
-
 // Add the response to the deferred queue.
 if (!future.isDone()) {
 context.deferredEventQueue.add(offset, this);
+
context.coordinator.updateLastWrittenOffset(offset);

Review Comment:
   Ok. yes, that was a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424354426


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesout() throws 
InterruptedException {
+MockTimer timer = new MockTimer();
+// The partition writer only accept on write.
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0, ctx.coordinator.lastWrittenOffset());
+assertEquals(0, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Write #1. We should get a TimeoutException because the HWM will not 
advance

Review Comment:
   nit: `.` at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424353506


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
 assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
 }
 
+@Test
+public void testScheduleWriteOpWhenWriteTimesout() throws 
InterruptedException {

Review Comment:
   nit: `TimesOut`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424352569


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -186,6 +187,7 @@ public void testConsumerGroupHeartbeat() throws 
ExecutionException, InterruptedE
 when(runtime.scheduleWriteOperation(
 ArgumentMatchers.eq("consumer-group-heartbeat"),
 ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.any(),

Review Comment:
   Could we actually put the expected value for all of those?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351506


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1092,6 +1134,7 @@ private CoordinatorRuntime(
 CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier,
 Time time,
 Timer timer,
+Duration timeout,

Review Comment:
   nit `defaultWriteTimeout`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350048


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+
+final Duration timeout;

Review Comment:
   Should it be `defaultWriteTimeout` too?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is applied to.
- * @param opThe write operation.
+ * @param name  The operation name.
+ * @param tpThe topic partition that the operation is applied 
to.
+ * @param timeout   The write operation timeout
+ * @param opThe write operation.
  */
 CoordinatorWriteEvent(
 String name,
 TopicPartition tp,
+Duration timeout,

Review Comment:
   Same question about the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424351314


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1019,6 +1055,11 @@ public void onHighWatermarkUpdated(
  */
 private final Timer timer;
 
+/**
+ * The write operation timeout
+ */
+private final Duration timeout;

Review Comment:
   nit: `defaultWriteTimeout`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350465


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -698,13 +720,27 @@ public void run() {
 producerEpoch,
 result.records()
 );
-
context.coordinator.updateLastWrittenOffset(offset);
+
+timer.add(new TimerTask(timeout.toMillis()) {
+@Override
+public void run() {
+if (!future.isDone()) {
+scheduleInternalOperation(
+"LogAppendEvent(name=" + name + ", 
tp=" + tp + ")",
+tp,
+() -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))
+);
+}
+}
+});
 
 // Add the response to the deferred queue.
 if (!future.isDone()) {
 context.deferredEventQueue.add(offset, this);
+
context.coordinator.updateLastWrittenOffset(offset);
 } else {
 complete(null);
+

Review Comment:
   nit: We could remove this empty line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424350233


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -595,13 +611,15 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
 /**
  * Constructor.
  *
- * @param name  The operation name.
- * @param tpThe topic partition that the operation is applied to.
- * @param opThe write operation.
+ * @param name  The operation name.
+ * @param tpThe topic partition that the operation is applied 
to.
+ * @param timeout   The write operation timeout
+ * @param opThe write operation.
  */
 CoordinatorWriteEvent(
 String name,
 TopicPartition tp,
+Duration timeout,

Review Comment:
   Same question about the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349399


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -135,6 +138,11 @@ public Builder withTimer(Timer timer) {
 return this;
 }
 
+public Builder withWriteTimeOut(Duration timeout) {

Review Comment:
   nit: `withDefaultWriteTimeout`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424349887


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+

Review Comment:
   nit: Remove empty line.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -581,6 +591,12 @@ class CoordinatorWriteEvent implements 
CoordinatorEvent, DeferredEvent {
  */
 final CompletableFuture future;
 
+/**
+ * Timeout value for the write operation
+ */
+
+final Duration timeout;

Review Comment:
   Should it be `defaultWriteTimeout` too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-12 Thread via GitHub


dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424345912


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -719,20 +721,26 @@ public void run() {
 result.records()
 );
 
-timer.add(new TimerTask(timeout) {
+timer.add(new TimerTask(timeout.toMillis()) {
 @Override
 public void run() {
-scheduleInternalOperation(name, tp, () -> 
complete(new TimeoutException("Writing records to the log timed out")));
+if (!future.isDone()) {
+scheduleInternalOperation(
+"LogAppendEvent(name=" + name + ", 
tp=" + tp + ")",
+tp,
+() -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))
+);
+}
 }
 });
 
-
context.coordinator.updateLastWrittenOffset(offset);
-
 // Add the response to the deferred queue.
 if (!future.isDone()) {
 context.deferredEventQueue.add(offset, this);
+
context.coordinator.updateLastWrittenOffset(offset);

Review Comment:
   Why did you move this? I think that it should stay where it was and the 
timer must be added here, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15237: Implement write operation timeout [kafka]

2023-12-11 Thread via GitHub


dajac commented on PR #14981:
URL: https://github.com/apache/kafka/pull/14981#issuecomment-1850048978

   @vamossagar12 I updated the title to match the one of the Jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org