dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1422418778
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -595,22 +609,25 @@ class CoordinatorWriteEvent<T> implements
CoordinatorEvent, DeferredEvent {
/**
* Constructor.
*
- * @param name The operation name.
- * @param tp The topic partition that the operation is applied to.
- * @param op The write operation.
+ * @param name The operation name.
+ * @param tp The topic partition that the operation is applied
to.
+ * @param op The write operation.
+ * @param timeout The write operation timeout
*/
CoordinatorWriteEvent(
String name,
TopicPartition tp,
- CoordinatorWriteOperation<S, T, U> op
+ CoordinatorWriteOperation<S, T, U> op,
Review Comment:
It would be better to keep the `op` as the last argument.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1259,7 +1298,8 @@ public <T> CompletableFuture<T>
scheduleTransactionalWriteOperation(
String transactionalId,
long producerId,
short producerEpoch,
- CoordinatorWriteOperation<S, T, U> op
+ CoordinatorWriteOperation<S, T, U> op,
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -698,6 +718,14 @@ public void run() {
producerEpoch,
result.records()
);
+
+ timer.add(new TimerTask(timeout) {
+ @Override
+ public void run() {
+ scheduleInternalOperation(name, tp, () ->
complete(new TimeoutException("Writing records to the log timed out")));
Review Comment:
I wonder if we could more details in the exception (e.g. event name, etc.).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -623,14 +640,16 @@ class CoordinatorWriteEvent<T> implements
CoordinatorEvent, DeferredEvent {
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param op The write operation.
+ * @param timeout The write operation timeout
*/
CoordinatorWriteEvent(
String name,
TopicPartition tp,
String transactionalId,
long producerId,
short producerEpoch,
- CoordinatorWriteOperation<S, T, U> op
+ CoordinatorWriteOperation<S, T, U> op,
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -581,6 +589,12 @@ class CoordinatorWriteEvent<T> implements
CoordinatorEvent, DeferredEvent {
*/
final CompletableFuture<T> future;
+ /**
+ * Timeout value for the write operation
+ */
+
+ final int timeout;
Review Comment:
I wonder if we should rather use a Duration. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1229,11 +1266,12 @@ private void withActiveContextOrThrow(
public <T> CompletableFuture<T> scheduleWriteOperation(
String name,
TopicPartition tp,
- CoordinatorWriteOperation<S, T, U> op
+ CoordinatorWriteOperation<S, T, U> op,
Review Comment:
nit: Let's keep `op` as the last param.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -92,6 +93,7 @@ public static class Builder<S extends CoordinatorShard<U>, U>
{
private CoordinatorShardBuilderSupplier<S, U>
coordinatorShardBuilderSupplier;
private Time time = Time.SYSTEM;
private Timer timer;
+ private int timeout;
Review Comment:
nit: should we call it `defaultWriteTimeout`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -950,6 +948,55 @@ public void testScheduleWriteOpWhenWriteFails() {
assertEquals(mkSet("record1", "record2"),
ctx.coordinator.coordinator().records());
}
+ @Test
+ public void testScheduleWriteOpWhenWriteTimesout() throws
InterruptedException {
+ MockTimer timer = new MockTimer();
+ MockTime time = new MockTime();
+ // The partition writer only accept on write.
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ // Loads the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0, ctx.lastWrittenOffset);
+ assertEquals(0, ctx.lastCommittedOffset);
+ assertEquals(Collections.singletonList(0L),
ctx.snapshotRegistry.epochsList());
+
+ // The partition writer times out when trying to append records.
+ doAnswer(invocation -> {
+ time.sleep(DEFAULT_WRITE_TIMEOUT * 3);
Review Comment:
You actually don't need this because the timeout will be reached if the HWM
is not advanced.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -698,6 +718,14 @@ public void run() {
producerEpoch,
result.records()
);
+
+ timer.add(new TimerTask(timeout) {
+ @Override
+ public void run() {
+ scheduleInternalOperation(name, tp, () ->
complete(new TimeoutException("Writing records to the log timed out")));
Review Comment:
I also wonder whether we could cancel the timer or skip scheduling the
internal op if when the event is completed. I can think of two ways: 1) We keep
a reference to TimerTask and call `cancel` on it; 2) We check the `future`
before calling `scheduleInternalOperation`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]