Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac merged PR #15534: URL: https://github.com/apache/kafka/pull/15534 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1537192799 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -504,12 +518,6 @@ public void testScheduleLoadingWithFailure() { // Verify that onUnloaded is called. verify(coordinator, times(1)).onUnloaded(); - -// Verify that the listener is deregistered. Review Comment: yep. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jolshan commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1536002502 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -504,12 +518,6 @@ public void testScheduleLoadingWithFailure() { // Verify that onUnloaded is called. verify(coordinator, times(1)).onUnloaded(); - -// Verify that the listener is deregistered. Review Comment: do we no longer unload because the listener is null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1535988740 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -495,6 +502,7 @@ private void transitionTo( case ACTIVE: state = CoordinatorState.ACTIVE; +highWatermarklistener = new HighWatermarkListener(); Review Comment: That’s right. We always start with a clean state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jolshan commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1535986083 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -495,6 +502,7 @@ private void transitionTo( case ACTIVE: state = CoordinatorState.ACTIVE; +highWatermarklistener = new HighWatermarkListener(); Review Comment: did we change this because we want to not accidentally retain state from the previous load? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1534979427 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -2591,6 +2599,74 @@ public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() { assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); } +@Test +public void testHighWatermarkUpdate() { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); +ManualEventProcessor processor = new ManualEventProcessor(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(processor) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Loads the coordinator. Poll once to execute the load operation and once +// to complete the load. +runtime.scheduleLoadOperation(TP, 10); +processor.poll(); +processor.poll(); + +// Write #1. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1") +); +processor.poll(); + +// Write #2. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2") +); +processor.poll(); + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value("record1"), +InMemoryPartitionWriter.LogEntry.value("record2") +), writer.entries(TP)); + +// There is no pending high watermark. +assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + +// Commit the first record. +writer.commit(TP, 1); + +// We should have one pending event and the pending high watermark should be set. +assertEquals(1, processor.size()); +assertEquals(1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + +// Commit the second record. +writer.commit(TP, 2); + +// We should still have one pending event and the pending high watermark should be updated. +assertEquals(1, processor.size()); +assertEquals(2, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + +// Poll once to process the high watermark update and complete the writes. +processor.poll(); + +assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); Review Comment: can we add ``` assertEquals(2, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset()); ``` below to confirm the last committed offset is updated accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1533486994 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: Good point. I added a new unit test to better cover this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532907721 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: Do we have a test to confirm this behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532903183 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: > Hum... My understanding is that the code will actually set lastHighWatermark from NO_OFFSET to h1 and push the event in this case. Thanks for the correction You're right, I misunderstood the process. Makes sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532741513 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. +processor.enqueueFirst(new CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> { +long newHighWatermark = lastHighWatermark.getAndSet(NO_OFFSET); + +CoordinatorContext context = coordinators.get(tp); Review Comment: In order to have better logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
dajac commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1532739687 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: > The first HWM advancement to h1 will set lastHighWatermark to NO_OFFSET and enqueueFirst() HWM update event. Hum... My understanding is that the code will actually set lastHighWatermark from NO_OFFSET to h1 and push the event in this case. > Before the first event runs, let's say the HWM advances to h2. this will see that lastHighWatermark is NO_OFFSET and will skip enqueueFirst(). It will update lastHighWatermark to h2 and, as the previous value is not NO_OFFSET, it does not push the event this time. > I wonder if we can: > * keep track of highest HWM updated > * only enqueueFirst if the offset to update is greater than highest HWM recorded Isn't it more or less what my change does? It does not enforce that the HWM is greater than the previous one though but this should not happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]
jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1529191724 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. +processor.enqueueFirst(new CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> { Review Comment: this should be just `enqueueFirst(...)`, not `processor.enqueueFirst(...)` right? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp); -if (context != null) { -context.lock.lock(); -try { -if (context.state == CoordinatorState.ACTIVE) { -// The updated high watermark can be applied to the coordinator only if the coordinator -// exists and is in the active state. -log.debug("Updating high watermark of {} to {}.", tp, offset); - context.coordinator.updateLastCommittedOffset(offset); -context.deferredEventQueue.completeUpTo(offset); -coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); -} else { -log.debug("Ignored high watermark updated for {} to {} because the coordinator is not active.", -tp, offset); +if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) { +// An event to apply the new high watermark is pushed to the front of the +// queue only if the previous value was -1L. If it was not, it means that +// there is already an event waiting to process the last value. Review Comment: Let's say there are two HWM advancements, to offset `h1` and `h2` respectively. (`h1 < h2`) The first HWM advancement to `h1` will set lastHighWatermark to `NO_OFFSET` and enqueueFirst() HWM update event. Before the first event runs, let's say the HWM advances to `h2`. this will see that lastHighWatermark is `NO_OFFSET` and will skip enqueueFirst(). Doesn't this mean that all write events waiting for committed offset `h1 < committed_offset <= h2` cannot complete until the HWM advances again? I wonder if we can: * keep track of highest HWM updated * only enqueueFirst if the offset to update is greater than highest HWM recorded Would this work? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated( long offset ) { log.debug("High watermark of {} incremented to {}.", tp, offset); -scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> { -CoordinatorContext context = coordinators.get(tp);