Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-06 Thread via GitHub


dajac merged PR #14849:
URL: https://github.com/apache/kafka/pull/14849


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-05 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1416324140


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -906,8 +831,8 @@ public void run() {
 withActiveContextOrThrow(tp, context -> {
 // Execute the read operation.
 response = op.generateResponse(
-context.coordinator,
-context.lastCommittedOffset
+context.stateMachine.coordinator(),

Review Comment:
   The lock you're referring to is the SnapshottableCoordinator right? i can't 
think of a way to resolve this without making the code messier. This seems to 
require a big change in our code structure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1415597424


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1659,4 +1685,117 @@ public void testPartitionLoadSensor() {
 
 verify(runtimeMetrics, 
times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
 }
+
+@Test
+public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(Time.SYSTEM)
+.withTimer(timer)
+.withLoader(new MockCoordinatorLoader(
+new CoordinatorLoader.LoadSummary(
+1000,
+2000,
+30,
+3000),
+Arrays.asList(5L, 15L, 27L),
+Arrays.asList(5L, 15L)))
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Getting the coordinator context fails because the coordinator
+// does not exist until scheduleLoadOperation is called.
+assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the coordinator context succeeds now.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+// When the loading completes, the coordinator transitions to active.
+assertEquals(ACTIVE, ctx.state);
+
+assertEquals(27L, ctx.coordinator.lastWrittenOffset());
+assertEquals(15L, ctx.coordinator.lastCommittedOffset());
+assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
+assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(5L));
+assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(15L));
+assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(27L));
+}
+
+@Test
+public void 
testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(Time.SYSTEM)
+.withTimer(timer)
+.withLoader(new MockCoordinatorLoader(
+new CoordinatorLoader.LoadSummary(
+1000,
+2000,
+30,
+3000),
+Collections.emptyList(),
+Collections.emptyList()))
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.w

Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412701656


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1659,4 +1680,117 @@ public void testPartitionLoadSensor() {
 
 verify(runtimeMetrics, 
times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
 }
+
+@Test
+public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(Time.SYSTEM)
+.withTimer(timer)
+.withLoader(new MockCoordinatorLoader(
+new CoordinatorLoader.LoadSummary(
+1000,
+2000,
+30,
+3000),
+Arrays.asList(5L, 15L, 27L),
+Arrays.asList(5L, 15L)))
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+
+// Getting the coordinator context fails because the coordinator
+// does not exist until scheduleLoadOperation is called.
+assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the coordinator context succeeds now.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+// When the loading completes, the coordinator transitions to active.
+assertEquals(ACTIVE, ctx.state);
+
+assertEquals(27L, ctx.stateMachine.lastWrittenOffset());
+assertEquals(15L, ctx.stateMachine.lastCommittedOffset());
+assertFalse(ctx.stateMachine.snapshotRegistry().hasSnapshot(0L));

Review Comment:
   i'm not sure what you mean. this is to confirm that previous snapshots were 
deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412700208


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -322,7 +335,7 @@ public void testScheduleLoading() {
 when(builder.build()).thenReturn(coordinator);
 when(supplier.get()).thenReturn(builder);
 CompletableFuture future = new 
CompletableFuture<>();
-when(loader.load(TP, coordinator)).thenReturn(future);
+when(loader.load(eq(TP), any())).thenReturn(future);

Review Comment:
   i'm not sure how to go about this. The reason I had it this way was because 
until we call `runtime.scheduleLoadOperation()`, we don't have a context for 
the topic partition. but when we schedule a load operation we run 
`loader.load()`. If there's a way, I'll make the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412627530


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine, U> implements 
CoordinatorPlayback {
+/**
+ * The logger.
+ */
+private final Logger log;
+/**
+ * The actual state machine.
+ */
+private S coordinator;
+
+/**
+ * The snapshot registry backing the coordinator.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The topic partition.
+ */
+private final TopicPartition tp;
+
+/**
+ * The last offset written to the partition.
+ */
+private long lastWrittenOffset;
+
+/**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+private long lastCommittedOffset;
+
+ContextStateMachine(
+LogContext logContext,
+SnapshotRegistry snapshotRegistry,
+S coordinator,
+TopicPartition tp
+) {
+this.log = logContext.logger(ContextStateMachine.class);
+this.coordinator = coordinator;
+this.snapshotRegistry = snapshotRegistry;
+this.tp = tp;
+this.lastWrittenOffset = 0;
+this.lastCommittedOffset = 0;
+snapshotRegistry.getOrCreateSnapshot(0);
+snapshotRegistry.deleteSnapshotsUpTo(0);
+}
+
+/**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+synchronized void revertLastWrittenOffset(
+long offset
+) {
+if (offset > lastWrittenOffset) {
+throw new IllegalStateException("New offset " + offset + " of " + 
tp +
+" must be smaller than " + lastWrittenOffset + ".");
+}
+
+log.debug("Revert last written offset of {} to {}.", tp, offset);
+lastWrittenOffset = offset;
+snapshotRegistry.revertToSnapshot(offset);
+}
+
+@Override
+public synchronized void replay(
+long producerId,
+short producerEpoch,
+U record
+) {
+coordinator.replay(producerId, producerEpoch, record);
+}
+
+/**
+ * Updates the last written offset. This also create a new snapshot
+ * in the snapshot registry.
+ *
+ * @param offset The new last written offset.
+ */
+@Override
+public synchronized void updateLastWrittenOffset(Long offset) {
+if (offset <= lastWrittenOffset) {
+throw new IllegalStateException("New last written offset " + 
offset + " of " + tp +
+" must be greater than " + lastWrittenOffset + ".");
+}
+
+log.debug("Update last written offset of {} to {}.", tp, offset);
+lastWrittenOffset = offset;
+snapshotRegistry.getOrCreateSnapshot(offset);
+}
+
+/**
+ * Updates the last committed offset. This completes all the deferred
+ * events waiting on this offset. This also cleanups all the snapshots
+ * prior to this offset.
+ *
+ * @param offset The new last committed offset.
+ */
+@Override
+public synchronized void updateLastCommittedOffset(Long offset) {
+if (offset < lastCommittedOffset) {
+throw new IllegalStateException("New committed offset " + offset + 
" of " + tp +
+" must be greater than or equal to " + lastCommittedOffset + 
".");
+}
+
+lastCommittedOffset = offset;
+snapshotRegistry

Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622438


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T](
   }
 
   currentOffset = batch.nextOffset
+  val currentHighWatermark = log.highWatermark
+  if (currentOffset >= currentHighWatermark) {
+coordinator.updateLastWrittenOffset(currentOffset)
+  }
+
+  if (currentHighWatermark > previousHighWatermark) {
+coordinator.updateLastCommittedOffset(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }

Review Comment:
   added a comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622265


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine, U> implements 
CoordinatorPlayback {

Review Comment:
   i didn't like it either :) thanks for the suggestion, sounds much nicer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-01 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412044990


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T](
   }
 
   currentOffset = batch.nextOffset
+  val currentHighWatermark = log.highWatermark
+  if (currentOffset >= currentHighWatermark) {
+coordinator.updateLastWrittenOffset(currentOffset)
+  }
+
+  if (currentHighWatermark > previousHighWatermark) {
+coordinator.updateLastCommittedOffset(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }

Review Comment:
   I think that it works. However, it is worth pointing out that the last 
committed offset could be higher than the last written offset in the state 
machine until records past it are read.



##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest {
 }
   }
 
+  @Test
+  def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time = Time.SYSTEM,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult1)
+
+  val readResult2 = logReadResult(startOffset = 2, records = Seq(
+new SimpleRecord("k3".getBytes, "v3".getBytes),
+new SimpleRecord("k4".getBytes, "v4".getBytes),
+new SimpleRecord("k5".getBytes, "v5".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 2L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult2)
+
+  val readResult3 = logReadResult(startOffset = 5, records = Seq(
+new SimpleRecord("k6".getBytes, "v6".getBytes),
+new SimpleRecord("k7".getBytes, "v7".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 5L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult3)
+
+  assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+  verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+  verify(coordinator, times(0)).updateLastWrittenOffset(0)
+  verify(coordinator, times(1)).updateLastWrittenOffset(2)
+  verify(coordinator, times(1)).updateLastWrittenOffset(5)
+  verify(coordinator, times(1)).updateLastWrittenOffset(7)
+  verify(coordinator, times(1)).updateLastCommittedOffset(0)
+  verify(coordinator, times(1)).updateLastCommittedOffset(2)
+  verify(coordinator, times(0)).updateLastCommittedOffset(5)
+}
+  }
+
+  @Test
+  def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): 
Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time = Time.SYSTEM,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBuf

Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-30 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1411063985


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java:
##
@@ -47,4 +47,15 @@ default void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {}
  * any post unloading operations.
  */
 default void onUnloaded() {}
+
+/**
+ * Replay a record to update the state machine.
+ *
+ * @param record The record to replay.
+ */
+default void replay(
+long producerId,
+short producerEpoch,
+U record
+) throws RuntimeException {}

Review Comment:
   ah, i misunderstood. thanks for pointing it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-30 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1410820590


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -370,6 +371,155 @@ public int size() {
  * CoordinatorContext holds all the metadata around a coordinator state 
machine.
  */
 class CoordinatorContext {
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state 
machine. The methods
+ * may be invoked by multiple threads, so they are synchronized.
+ */
+class ContextStateMachine implements CoordinatorPlayback {

Review Comment:
   I think that we are pushing the nested classes a bit too far here and it 
makes it hard to reason about the concurrency for this class because we access 
attributes outside of it.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -370,6 +371,155 @@ public int size() {
  * CoordinatorContext holds all the metadata around a coordinator state 
machine.
  */
 class CoordinatorContext {
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state 
machine. The methods
+ * may be invoked by multiple threads, so they are synchronized.
+ */
+class ContextStateMachine implements CoordinatorPlayback {
+
+/**
+ * The actual state machine.
+ */
+private S coordinator;
+
+/**
+ * The snapshot registry backing the coordinator.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The last offset written to the partition.
+ */
+private long lastWrittenOffset;
+
+/**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+private long lastCommittedOffset;
+
+ContextStateMachine(
+SnapshotRegistry snapshotRegistry,
+S coordinator, 
+long lastWrittenOffset, 
+long lastCommittedOffset
+) {
+this.coordinator = coordinator;
+this.snapshotRegistry = snapshotRegistry;
+this.lastWrittenOffset = lastWrittenOffset;
+this.lastCommittedOffset = lastCommittedOffset;
+snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+snapshotRegistry.deleteSnapshotsUpTo(lastWrittenOffset);
+}
+
+/**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the 
offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+private synchronized void revertLastWrittenOffset(
+long offset
+) {
+if (offset > lastWrittenOffset) {
+throw new IllegalStateException("New offset " + offset + " 
of " + tp +
+" must be smaller than " + lastWrittenOffset + ".");
+}
+
+log.debug("Revert last written offset of {} to {}.", tp, 
offset);
+lastWrittenOffset = offset;
+snapshotRegistry.revertToSnapshot(offset);
+}
+
+@Override
+public void replay(
+long producerId,
+short producerEpoch,
+U record
+) {
+coordinator.replay(producerId, producerEpoch, record);
+}
+
+/**
+ * Updates the last written offset. This also create a new snapshot
+ * in the snapshot registry.
+ *
+ * @param offset The new last written offset.
+ */
+@Override
+public synchronized void updateLastWrittenOffset(Long offset) {
+if (offset <= lastWrittenOffset) {
+throw new IllegalStateException("New last written offset " 
+ offset + " of " + tp +
+" must be greater than " + lastWrittenOffset + ".");
+}
+
+log.debug("Update last written offset of {} to {}.", tp, 
offset);
+lastWrittenOffset = offset;
+snapshotRegistry.getOrCreateSnapshot(offset);
+}
+
+/**
+ * Updates the last committed offset. This completes all the 
deferred
+ * events waiting on this offset. This also cleanups all the 
snapshots
+ * prior to this offset.
+ *
+ * @param offset The new last committed offset.
+ */
+@Override
+public void updateLastCommittedOffset(Long offset) {
+

Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1410029648


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }
+
+  if (currentOffset >= currentHighWatermark) {
+onLoadedBatch.accept(currentOffset)
+  }

Review Comment:
   you're right, we return
   ```
   // Required offset.
   partitionResult.lastOffset + 1
   ```
   in CoordinatorPartitionWriter#append



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409851471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java:
##
@@ -47,4 +47,11 @@ default void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {}
  * any post unloading operations.
  */
 default void onUnloaded() {}
+
+/**
+ * Replay a record to update the state machine.
+ *
+ * @param record The record to replay.
+ */
+default void replay(U record) throws RuntimeException {}

Review Comment:
   this doesn't work because CoordinatorShard no longer longer extends 
CoordinatorPlayback so it doesn't have a replay() method anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409265390


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1315,7 +1329,10 @@ public void scheduleLoadOperation(
 case FAILED:
 case INITIAL:
 context.transitionTo(CoordinatorState.LOADING);
-loader.load(tp, 
context.coordinator).whenComplete((summary, exception) -> {
+loader.load(
+tp,
+context

Review Comment:
   I was thinking about the actual impact of passing the context to another 
thread. One issue here is that the context is protected by a lock and the 
loader thread does not use it at all. One potential issue for instance is that 
the context could transition to Closed while the coordinator is loaded.
   
   Therefore, I wonder whether we should structure things a bit differently. I 
am thinking about introducing a new class which would hold the coordinator, 
snapshot registry, lastWrittenOffset and lastCommittedOffset. The class would 
also implement the CoordinatorPlayback interface. In the context, we would 
remove all those fields and replace them by a reference to an instance of that 
new class. I think that we would have to also synchronize the methods to ensure 
a memory barrier. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409239039


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java:
##
@@ -47,4 +47,11 @@ default void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {}
  * any post unloading operations.
  */
 default void onUnloaded() {}
+
+/**
+ * Replay a record to update the state machine.
+ *
+ * @param record The record to replay.
+ */
+default void replay(U record) throws RuntimeException {}

Review Comment:
   I think that this should not have a default implementation. It was not the 
case before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409237609


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java:
##
@@ -33,4 +33,18 @@ public interface CoordinatorPlayback {
  * @throws RuntimeException if the record can not be applied.
  */
 void replay(U record) throws RuntimeException;
+
+/**
+ * Invoke operations when a batch has been successfully loaded.
+ *
+ * @param offset the offset of the last record in the batch.
+ */
+void onBatchLoaded(Long offset);

Review Comment:
   Should we align the naming on the method that we have in the coordinator 
context? It is kind of confusing if we use different terms here, I think.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java:
##
@@ -33,4 +33,18 @@ public interface CoordinatorPlayback {
  * @throws RuntimeException if the record can not be applied.
  */
 void replay(U record) throws RuntimeException;
+
+/**
+ * Invoke operations when a batch has been successfully loaded.
+ *
+ * @param offset the offset of the last record in the batch.
+ */
+void onBatchLoaded(Long offset);
+
+/**
+ * Called when the high watermark advances.
+ *
+ * @param highWatermark the new high watermark.
+ */
+void onHighWatermarkUpdated(Long highWatermark);

Review Comment:
   Same question here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409232801


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -528,8 +528,8 @@ private void transitionTo(
 case LOADING:
 state = CoordinatorState.LOADING;
 snapshotRegistry = new SnapshotRegistry(logContext);
-lastWrittenOffset = 0L;
-lastCommittedOffset = 0L;
+updateLastWrittenOffset(0);
+updateLastCommittedOffset(0);

Review Comment:
   I think that this won't always work. For instance, think about the case 
where we go from Failed to Loading. The reset won't work because `0` is not 
larger than the current values. It may be better to just keep what was there 
and to add the first snapshot creating there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409230711


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -458,9 +458,9 @@ private CoordinatorContext(
 private void updateLastWrittenOffset(
 long offset
 ) {
-if (offset <= lastWrittenOffset) {
+if (offset < lastWrittenOffset) {

Review Comment:
   Hum... I am not sure about this because the offset should always increase. 
This is an important invariant of the system and by relaxing this we could 
potentially violate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409228279


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }
+
+  if (currentOffset >= currentHighWatermark) {
+onLoadedBatch.accept(currentOffset)
+  }

Review Comment:
   Yes, I think so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-29 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1409025050


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T](
* Loads the coordinator by reading all the records from the TopicPartition
* and applying them to the Replayable object.
*
-   * @param tp  The TopicPartition to read from.
-   * @param coordinator The object to apply records to.
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   * @param onLoadedBatch   Invoked when a batch was successfully 
loaded.
+   * @param onHighWatermarkUpdated  Invoked when the high watermark advanced.
*/
   override def load(
 tp: TopicPartition,
-coordinator: CoordinatorPlayback[T]
+coordinator: CoordinatorPlayback[T],
+onLoadedBatch: Consumer[java.lang.Long],
+onHighWatermarkUpdated: Consumer[java.lang.Long]

Review Comment:
   > Is it not a good practice to use consumers like this?
   
   It looks weird because all the three go together so an interface is better 
in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1408532337


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -543,8 +543,14 @@ private void transitionTo(
 
 case ACTIVE:
 state = CoordinatorState.ACTIVE;
-snapshotRegistry.getOrCreateSnapshot(0);
 partitionWriter.registerListener(tp, 
highWatermarklistener);
+
+// If the partition did not contain any records, we would 
not have generated a snapshot
+// while loading.
+if (lastWrittenOffset == -1L) {
+updateLastWrittenOffset(0);
+updateLastCommittedOffset(0);
+}

Review Comment:
   i reverted this and instead created a snapshot at epoch 0 when transitioning 
to LOADING state. it doesn't make sense to create a snapshot at epoch 0 after 
loading.
   
   i also changed the exception conditions to be `>` and not `>=` since we do 
expect lastCommittedOffset and lastWrittenOffset to sometimes be updated to the 
same value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1408531602


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T](
* Loads the coordinator by reading all the records from the TopicPartition
* and applying them to the Replayable object.
*
-   * @param tp  The TopicPartition to read from.
-   * @param coordinator The object to apply records to.
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   * @param onLoadedBatch   Invoked when a batch was successfully 
loaded.
+   * @param onHighWatermarkUpdated  Invoked when the high watermark advanced.
*/
   override def load(
 tp: TopicPartition,
-coordinator: CoordinatorPlayback[T]
+coordinator: CoordinatorPlayback[T],
+onLoadedBatch: Consumer[java.lang.Long],
+onHighWatermarkUpdated: Consumer[java.lang.Long]

Review Comment:
   i reflected your suggestion, let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1408090316


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -543,8 +543,14 @@ private void transitionTo(
 
 case ACTIVE:
 state = CoordinatorState.ACTIVE;
-snapshotRegistry.getOrCreateSnapshot(0);
 partitionWriter.registerListener(tp, 
highWatermarklistener);
+
+// If the partition did not contain any records, we would 
not have generated a snapshot
+// while loading.
+if (lastWrittenOffset == -1L) {
+updateLastWrittenOffset(0);
+updateLastCommittedOffset(0);
+}

Review Comment:
   for both offsets, are you saying that we won't hit the case where we update 
HWM or last written offset to 0 while loading? SInce we throw an exception in 
both methods
   ```
   if (offset <= lastWrittenOffset) {
   throw new IllegalStateException("New last written offset " + 
offset + " of " + tp +
   " must be larger than " + lastWrittenOffset + ".");
   }
   ```
   Which is why I initialized them as -1L



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1408072792


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }

Review Comment:
   yeah, that makes sense to me since updating hwm also deletes snapshots



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1408070465


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }
+
+  if (currentOffset >= currentHighWatermark) {
+onLoadedBatch.accept(currentOffset)
+  }

Review Comment:
   currentOffset points to the nextOffset (the last offset in the batch plus 1) 
so I had this after successfully loading for an offset. Should 
lastWrittenOffset point to the next offset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1408056685


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T](
* Loads the coordinator by reading all the records from the TopicPartition
* and applying them to the Replayable object.
*
-   * @param tp  The TopicPartition to read from.
-   * @param coordinator The object to apply records to.
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   * @param onLoadedBatch   Invoked when a batch was successfully 
loaded.
+   * @param onHighWatermarkUpdated  Invoked when the high watermark advanced.
*/
   override def load(
 tp: TopicPartition,
-coordinator: CoordinatorPlayback[T]
+coordinator: CoordinatorPlayback[T],
+onLoadedBatch: Consumer[java.lang.Long],
+onHighWatermarkUpdated: Consumer[java.lang.Long]

Review Comment:
   i did not like the fact that we need to have a replay() method in both 
CoordinatorPlayback and CoordinatorShard. we cannot move replay() to just 
CoordinatorContext because it needs to access OffsetMetadataManager and 
GroupMetadataManager.
   
   If you think that is fine, then I can make the changes. Is it not a good 
practice to use consumers like this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-11-28 Thread via GitHub


dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1407405303


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }
+
+  if (currentOffset >= currentHighWatermark) {
+onLoadedBatch.accept(currentOffset)
+  }

Review Comment:
   `onLoadedBatch` updates the lastWrittenOffset so shouldn't we call this 
after updating `currentOffset`?



##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T](
 }
   }
 
+  val currentHighWatermark = log.highWatermark
+  if (currentHighWatermark > previousHighWatermark) {
+onHighWatermarkUpdated.accept(currentHighWatermark)
+previousHighWatermark = currentHighWatermark
+  }

Review Comment:
   I wonder if we should rather do this after updating the last written offset. 
At least, conceptually it makes more sense. What do you think?



##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T](
* Loads the coordinator by reading all the records from the TopicPartition
* and applying them to the Replayable object.
*
-   * @param tp  The TopicPartition to read from.
-   * @param coordinator The object to apply records to.
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   * @param onLoadedBatch   Invoked when a batch was successfully 
loaded.
+   * @param onHighWatermarkUpdated  Invoked when the high watermark advanced.
*/
   override def load(
 tp: TopicPartition,
-coordinator: CoordinatorPlayback[T]
+coordinator: CoordinatorPlayback[T],
+onLoadedBatch: Consumer[java.lang.Long],
+onHighWatermarkUpdated: Consumer[java.lang.Long]

Review Comment:
   This does not look good. Is there a reason why you did not use an interface 
which merges those three? In my opinion, it would be better to have an 
interface which defines:
   * replay
   * updateLastWrittenOffset
   * updateLastCommittedOffset
   
   We could re-purpose `CoordinatorPlayback` for this and introduce a new 
interface for the loader.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -543,8 +543,14 @@ private void transitionTo(
 
 case ACTIVE:
 state = CoordinatorState.ACTIVE;
-snapshotRegistry.getOrCreateSnapshot(0);
 partitionWriter.registerListener(tp, 
highWatermarklistener);
+
+// If the partition did not contain any records, we would 
not have generated a snapshot
+// while loading.
+if (lastWrittenOffset == -1L) {
+updateLastWrittenOffset(0);
+updateLastCommittedOffset(0);
+}

Review Comment:
   I was wondering whether we could just keep 0 as the default values for both 
(In LOADING state) and create the base snapshot there. Then, we don't have to 
handle a special case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org