Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-25 Thread via GitHub


dajac merged PR #15534:
URL: https://github.com/apache/kafka/pull/15534


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-25 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1537192799


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -504,12 +518,6 @@ public void testScheduleLoadingWithFailure() {
 
 // Verify that onUnloaded is called.
 verify(coordinator, times(1)).onUnloaded();
-
-// Verify that the listener is deregistered.

Review Comment:
   yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-22 Thread via GitHub


jolshan commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1536002502


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -504,12 +518,6 @@ public void testScheduleLoadingWithFailure() {
 
 // Verify that onUnloaded is called.
 verify(coordinator, times(1)).onUnloaded();
-
-// Verify that the listener is deregistered.

Review Comment:
   do we no longer unload because the listener is null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-22 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1535988740


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -495,6 +502,7 @@ private void transitionTo(
 
 case ACTIVE:
 state = CoordinatorState.ACTIVE;
+highWatermarklistener = new HighWatermarkListener();

Review Comment:
   That’s right. We always start with a clean state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-22 Thread via GitHub


jolshan commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1535986083


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -495,6 +502,7 @@ private void transitionTo(
 
 case ACTIVE:
 state = CoordinatorState.ACTIVE;
+highWatermarklistener = new HighWatermarkListener();

Review Comment:
   did we change this because we want to not accidentally retain state from the 
previous load?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-21 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1534979427


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -2591,6 +2599,74 @@ public void 
testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
 assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
 }
 
+@Test
+public void testHighWatermarkUpdate() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+ManualEventProcessor processor = new ManualEventProcessor();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(processor)
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Loads the coordinator. Poll once to execute the load operation and 
once
+// to complete the load.
+runtime.scheduleLoadOperation(TP, 10);
+processor.poll();
+processor.poll();
+
+// Write #1.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1")
+);
+processor.poll();
+
+// Write #2.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2")
+);
+processor.poll();
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value("record1"),
+InMemoryPartitionWriter.LogEntry.value("record2")
+), writer.entries(TP));
+
+// There is no pending high watermark.
+assertEquals(-1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+// Commit the first record.
+writer.commit(TP, 1);
+
+// We should have one pending event and the pending high watermark 
should be set.
+assertEquals(1, processor.size());
+assertEquals(1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+// Commit the second record.
+writer.commit(TP, 2);
+
+// We should still have one pending event and the pending high 
watermark should be updated.
+assertEquals(1, processor.size());
+assertEquals(2, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+// Poll once to process the high watermark update and complete the 
writes.
+processor.poll();
+
+assertEquals(-1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());

Review Comment:
   can we add
   ```
   assertEquals(2, 
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
   ```
   below to confirm the last committed offset is updated accordingly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-21 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1533486994


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   Good point. I added a new unit test to better cover this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532907721


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   Do we have a test to confirm this behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532903183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   > Hum... My understanding is that the code will actually set 
lastHighWatermark from NO_OFFSET to h1 and push the event in this case.
   
   Thanks for the correction  You're right, I misunderstood the process. Makes 
sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532741513


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.
+processor.enqueueFirst(new 
CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> {
+long newHighWatermark = 
lastHighWatermark.getAndSet(NO_OFFSET);
+
+CoordinatorContext context = coordinators.get(tp);

Review Comment:
   In order to have better logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-20 Thread via GitHub


dajac commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1532739687


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   > The first HWM advancement to h1 will set lastHighWatermark to NO_OFFSET 
and enqueueFirst() HWM update event.
   
   Hum... My understanding is that the code will actually set lastHighWatermark 
from NO_OFFSET to h1 and push the event in this case.
   
   > Before the first event runs, let's say the HWM advances to h2. this will 
see that lastHighWatermark is NO_OFFSET and will skip enqueueFirst().
   
   It will update lastHighWatermark to h2 and, as the previous value is not 
NO_OFFSET, it does not push the event this time.
   
   > I wonder if we can:
   > * keep track of highest HWM updated
   > * only enqueueFirst if the offset to update is greater than highest HWM 
recorded
   
   Isn't it more or less what my change does? It does not enforce that the HWM 
is greater than the previous one though but this should not happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16374; High watermark updates should have a higher priority [kafka]

2024-03-18 Thread via GitHub


jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1529191724


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.
+processor.enqueueFirst(new 
CoordinatorInternalEvent("HighWatermarkUpdate", tp, () -> {

Review Comment:
   this should be just `enqueueFirst(...)`, not `processor.enqueueFirst(...)` 
right?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);
-if (context != null) {
-context.lock.lock();
-try {
-if (context.state == CoordinatorState.ACTIVE) {
-// The updated high watermark can be applied to 
the coordinator only if the coordinator
-// exists and is in the active state.
-log.debug("Updating high watermark of {} to {}.", 
tp, offset);
-
context.coordinator.updateLastCommittedOffset(offset);
-context.deferredEventQueue.completeUpTo(offset);
-coordinatorMetrics.onUpdateLastCommittedOffset(tp, 
offset);
-} else {
-log.debug("Ignored high watermark updated for {} 
to {} because the coordinator is not active.",
-tp, offset);
+if (lastHighWatermark.getAndSet(offset) == NO_OFFSET) {
+// An event to apply the new high watermark is pushed to the 
front of the
+// queue only if the previous value was -1L. If it was not, it 
means that
+// there is already an event waiting to process the last value.

Review Comment:
   Let's say there are two HWM advancements, to offset `h1` and `h2` 
respectively. (`h1 < h2`)
   
   The first HWM advancement to `h1` will set lastHighWatermark to `NO_OFFSET` 
and enqueueFirst() HWM update event.
   
   Before the first event runs, let's say the HWM advances to `h2`. this will 
see that lastHighWatermark is `NO_OFFSET` and will skip enqueueFirst().
   
   Doesn't this mean that all write events waiting for committed offset `h1 < 
committed_offset <= h2` cannot complete until the HWM advances again?
   
   I wonder if we can:
   * keep track of highest HWM updated 
   * only enqueueFirst if the offset to update is greater than highest HWM 
recorded
   
   Would this work?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1191,30 +1212,37 @@ public void onHighWatermarkUpdated(
 long offset
 ) {
 log.debug("High watermark of {} incremented to {}.", tp, offset);
-scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", 
offset=" + offset + ")", tp, () -> {
-CoordinatorContext context = coordinators.get(tp);