Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac merged PR #14849: URL: https://github.com/apache/kafka/pull/14849 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1416324140 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -906,8 +831,8 @@ public void run() { withActiveContextOrThrow(tp, context -> { // Execute the read operation. response = op.generateResponse( -context.coordinator, -context.lastCommittedOffset +context.stateMachine.coordinator(), Review Comment: The lock you're referring to is the SnapshottableCoordinator right? i can't think of a way to resolve this without making the code messier. This seems to require a big change in our code structure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1415597424 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1659,4 +1685,117 @@ public void testPartitionLoadSensor() { verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); } + +@Test +public void testPartitionLoadGeneratesSnapshotAtHighWatermark() { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(Time.SYSTEM) +.withTimer(timer) +.withLoader(new MockCoordinatorLoader( +new CoordinatorLoader.LoadSummary( +1000, +2000, +30, +3000), +Arrays.asList(5L, 15L, 27L), +Arrays.asList(5L, 15L))) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.withTimer(any())).thenReturn(builder); +when(builder.withCoordinatorMetrics(any())).thenReturn(builder); +when(builder.withTopicPartition(any())).thenReturn(builder); +when(builder.build()).thenReturn(coordinator); +when(supplier.get()).thenReturn(builder); + +// Getting the coordinator context fails because the coordinator +// does not exist until scheduleLoadOperation is called. +assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 0); + +// Getting the coordinator context succeeds now. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + +// When the loading completes, the coordinator transitions to active. +assertEquals(ACTIVE, ctx.state); + +assertEquals(27L, ctx.coordinator.lastWrittenOffset()); +assertEquals(15L, ctx.coordinator.lastCommittedOffset()); +assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); +assertFalse(ctx.coordinator.snapshotRegistry().hasSnapshot(5L)); +assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(15L)); +assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(27L)); +} + +@Test +public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(Time.SYSTEM) +.withTimer(timer) +.withLoader(new MockCoordinatorLoader( +new CoordinatorLoader.LoadSummary( +1000, +2000, +30, +3000), +Collections.emptyList(), +Collections.emptyList())) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.w
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412701656 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1659,4 +1680,117 @@ public void testPartitionLoadSensor() { verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); } + +@Test +public void testPartitionLoadGeneratesSnapshotAtHighWatermark() { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(Time.SYSTEM) +.withTimer(timer) +.withLoader(new MockCoordinatorLoader( +new CoordinatorLoader.LoadSummary( +1000, +2000, +30, +3000), +Arrays.asList(5L, 15L, 27L), +Arrays.asList(5L, 15L))) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.withTimer(any())).thenReturn(builder); +when(builder.withCoordinatorMetrics(any())).thenReturn(builder); +when(builder.withTopicPartition(any())).thenReturn(builder); +when(builder.build()).thenReturn(coordinator); +when(supplier.get()).thenReturn(builder); + +// Getting the coordinator context fails because the coordinator +// does not exist until scheduleLoadOperation is called. +assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 0); + +// Getting the coordinator context succeeds now. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + +// When the loading completes, the coordinator transitions to active. +assertEquals(ACTIVE, ctx.state); + +assertEquals(27L, ctx.stateMachine.lastWrittenOffset()); +assertEquals(15L, ctx.stateMachine.lastCommittedOffset()); +assertFalse(ctx.stateMachine.snapshotRegistry().hasSnapshot(0L)); Review Comment: i'm not sure what you mean. this is to confirm that previous snapshots were deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412700208 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -322,7 +335,7 @@ public void testScheduleLoading() { when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); CompletableFuture future = new CompletableFuture<>(); -when(loader.load(TP, coordinator)).thenReturn(future); +when(loader.load(eq(TP), any())).thenReturn(future); Review Comment: i'm not sure how to go about this. The reason I had it this way was because until we call `runtime.scheduleLoadOperation()`, we don't have a context for the topic partition. but when we schedule a load operation we run `loader.load()`. If there's a way, I'll make the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412627530 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java: ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. Generally, only a single thread at a + * time will access this object but multiple threads may access while loading the __consumer_offsets topic partition. + */ +class ContextStateMachine, U> implements CoordinatorPlayback { +/** + * The logger. + */ +private final Logger log; +/** + * The actual state machine. + */ +private S coordinator; + +/** + * The snapshot registry backing the coordinator. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The topic partition. + */ +private final TopicPartition tp; + +/** + * The last offset written to the partition. + */ +private long lastWrittenOffset; + +/** + * The last offset committed. This represents the high + * watermark of the partition. + */ +private long lastCommittedOffset; + +ContextStateMachine( +LogContext logContext, +SnapshotRegistry snapshotRegistry, +S coordinator, +TopicPartition tp +) { +this.log = logContext.logger(ContextStateMachine.class); +this.coordinator = coordinator; +this.snapshotRegistry = snapshotRegistry; +this.tp = tp; +this.lastWrittenOffset = 0; +this.lastCommittedOffset = 0; +snapshotRegistry.getOrCreateSnapshot(0); +snapshotRegistry.deleteSnapshotsUpTo(0); +} + +/** + * Reverts the last written offset. This also reverts the snapshot + * registry to this offset. All the changes applied after the offset + * are lost. + * + * @param offset The offset to revert to. + */ +synchronized void revertLastWrittenOffset( +long offset +) { +if (offset > lastWrittenOffset) { +throw new IllegalStateException("New offset " + offset + " of " + tp + +" must be smaller than " + lastWrittenOffset + "."); +} + +log.debug("Revert last written offset of {} to {}.", tp, offset); +lastWrittenOffset = offset; +snapshotRegistry.revertToSnapshot(offset); +} + +@Override +public synchronized void replay( +long producerId, +short producerEpoch, +U record +) { +coordinator.replay(producerId, producerEpoch, record); +} + +/** + * Updates the last written offset. This also create a new snapshot + * in the snapshot registry. + * + * @param offset The new last written offset. + */ +@Override +public synchronized void updateLastWrittenOffset(Long offset) { +if (offset <= lastWrittenOffset) { +throw new IllegalStateException("New last written offset " + offset + " of " + tp + +" must be greater than " + lastWrittenOffset + "."); +} + +log.debug("Update last written offset of {} to {}.", tp, offset); +lastWrittenOffset = offset; +snapshotRegistry.getOrCreateSnapshot(offset); +} + +/** + * Updates the last committed offset. This completes all the deferred + * events waiting on this offset. This also cleanups all the snapshots + * prior to this offset. + * + * @param offset The new last committed offset. + */ +@Override +public synchronized void updateLastCommittedOffset(Long offset) { +if (offset < lastCommittedOffset) { +throw new IllegalStateException("New committed offset " + offset + " of " + tp + +" must be greater than or equal to " + lastCommittedOffset + "."); +} + +lastCommittedOffset = offset; +snapshotRegistry
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622438 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T]( } currentOffset = batch.nextOffset + val currentHighWatermark = log.highWatermark + if (currentOffset >= currentHighWatermark) { +coordinator.updateLastWrittenOffset(currentOffset) + } + + if (currentHighWatermark > previousHighWatermark) { +coordinator.updateLastCommittedOffset(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } Review Comment: added a comment above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412622265 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java: ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. Generally, only a single thread at a + * time will access this object but multiple threads may access while loading the __consumer_offsets topic partition. + */ +class ContextStateMachine, U> implements CoordinatorPlayback { Review Comment: i didn't like it either :) thanks for the suggestion, sounds much nicer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412044990 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T]( } currentOffset = batch.nextOffset + val currentHighWatermark = log.highWatermark + if (currentOffset >= currentHighWatermark) { +coordinator.updateLastWrittenOffset(currentOffset) + } + + if (currentHighWatermark > previousHighWatermark) { +coordinator.updateLastCommittedOffset(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } Review Comment: I think that it works. However, it is worth pointing out that the last committed offset could be higher than the last written offset in the state machine until records past it are read. ## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala: ## @@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest { } } + @Test + def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = new StringKeyValueDeserializer +val log = mock(classOf[UnifiedLog]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 +)) { loader => + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( +new SimpleRecord("k1".getBytes, "v1".getBytes), +new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( +startOffset = 0L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult1) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( +new SimpleRecord("k3".getBytes, "v3".getBytes), +new SimpleRecord("k4".getBytes, "v4".getBytes), +new SimpleRecord("k5".getBytes, "v5".getBytes) + )) + + when(log.read( +startOffset = 2L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult2) + + val readResult3 = logReadResult(startOffset = 5, records = Seq( +new SimpleRecord("k6".getBytes, "v6".getBytes), +new SimpleRecord("k7".getBytes, "v7".getBytes) + )) + + when(log.read( +startOffset = 5L, +maxLength = 1000, +isolation = FetchIsolation.LOG_END, +minOneMessage = true + )).thenReturn(readResult3) + + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7")) + verify(coordinator, times(0)).updateLastWrittenOffset(0) + verify(coordinator, times(1)).updateLastWrittenOffset(2) + verify(coordinator, times(1)).updateLastWrittenOffset(5) + verify(coordinator, times(1)).updateLastWrittenOffset(7) + verify(coordinator, times(1)).updateLastCommittedOffset(0) + verify(coordinator, times(1)).updateLastCommittedOffset(2) + verify(coordinator, times(0)).updateLastCommittedOffset(5) +} + } + + @Test + def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val serde = new StringKeyValueDeserializer +val log = mock(classOf[UnifiedLog]) +val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + +TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, + replicaManager = replicaManager, + deserializer = serde, + loadBuf
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1411063985 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java: ## @@ -47,4 +47,15 @@ default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {} * any post unloading operations. */ default void onUnloaded() {} + +/** + * Replay a record to update the state machine. + * + * @param record The record to replay. + */ +default void replay( +long producerId, +short producerEpoch, +U record +) throws RuntimeException {} Review Comment: ah, i misunderstood. thanks for pointing it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1410820590 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -370,6 +371,155 @@ public int size() { * CoordinatorContext holds all the metadata around a coordinator state machine. */ class CoordinatorContext { + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. The methods + * may be invoked by multiple threads, so they are synchronized. + */ +class ContextStateMachine implements CoordinatorPlayback { Review Comment: I think that we are pushing the nested classes a bit too far here and it makes it hard to reason about the concurrency for this class because we access attributes outside of it. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -370,6 +371,155 @@ public int size() { * CoordinatorContext holds all the metadata around a coordinator state machine. */ class CoordinatorContext { + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. The methods + * may be invoked by multiple threads, so they are synchronized. + */ +class ContextStateMachine implements CoordinatorPlayback { + +/** + * The actual state machine. + */ +private S coordinator; + +/** + * The snapshot registry backing the coordinator. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The last offset written to the partition. + */ +private long lastWrittenOffset; + +/** + * The last offset committed. This represents the high + * watermark of the partition. + */ +private long lastCommittedOffset; + +ContextStateMachine( +SnapshotRegistry snapshotRegistry, +S coordinator, +long lastWrittenOffset, +long lastCommittedOffset +) { +this.coordinator = coordinator; +this.snapshotRegistry = snapshotRegistry; +this.lastWrittenOffset = lastWrittenOffset; +this.lastCommittedOffset = lastCommittedOffset; +snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); +snapshotRegistry.deleteSnapshotsUpTo(lastWrittenOffset); +} + +/** + * Reverts the last written offset. This also reverts the snapshot + * registry to this offset. All the changes applied after the offset + * are lost. + * + * @param offset The offset to revert to. + */ +private synchronized void revertLastWrittenOffset( +long offset +) { +if (offset > lastWrittenOffset) { +throw new IllegalStateException("New offset " + offset + " of " + tp + +" must be smaller than " + lastWrittenOffset + "."); +} + +log.debug("Revert last written offset of {} to {}.", tp, offset); +lastWrittenOffset = offset; +snapshotRegistry.revertToSnapshot(offset); +} + +@Override +public void replay( +long producerId, +short producerEpoch, +U record +) { +coordinator.replay(producerId, producerEpoch, record); +} + +/** + * Updates the last written offset. This also create a new snapshot + * in the snapshot registry. + * + * @param offset The new last written offset. + */ +@Override +public synchronized void updateLastWrittenOffset(Long offset) { +if (offset <= lastWrittenOffset) { +throw new IllegalStateException("New last written offset " + offset + " of " + tp + +" must be greater than " + lastWrittenOffset + "."); +} + +log.debug("Update last written offset of {} to {}.", tp, offset); +lastWrittenOffset = offset; +snapshotRegistry.getOrCreateSnapshot(offset); +} + +/** + * Updates the last committed offset. This completes all the deferred + * events waiting on this offset. This also cleanups all the snapshots + * prior to this offset. + * + * @param offset The new last committed offset. + */ +@Override +public void updateLastCommittedOffset(Long offset) { +
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1410029648 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } + + if (currentOffset >= currentHighWatermark) { +onLoadedBatch.accept(currentOffset) + } Review Comment: you're right, we return ``` // Required offset. partitionResult.lastOffset + 1 ``` in CoordinatorPartitionWriter#append -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409851471 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java: ## @@ -47,4 +47,11 @@ default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {} * any post unloading operations. */ default void onUnloaded() {} + +/** + * Replay a record to update the state machine. + * + * @param record The record to replay. + */ +default void replay(U record) throws RuntimeException {} Review Comment: this doesn't work because CoordinatorShard no longer longer extends CoordinatorPlayback so it doesn't have a replay() method anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409265390 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1315,7 +1329,10 @@ public void scheduleLoadOperation( case FAILED: case INITIAL: context.transitionTo(CoordinatorState.LOADING); -loader.load(tp, context.coordinator).whenComplete((summary, exception) -> { +loader.load( +tp, +context Review Comment: I was thinking about the actual impact of passing the context to another thread. One issue here is that the context is protected by a lock and the loader thread does not use it at all. One potential issue for instance is that the context could transition to Closed while the coordinator is loaded. Therefore, I wonder whether we should structure things a bit differently. I am thinking about introducing a new class which would hold the coordinator, snapshot registry, lastWrittenOffset and lastCommittedOffset. The class would also implement the CoordinatorPlayback interface. In the context, we would remove all those fields and replace them by a reference to an instance of that new class. I think that we would have to also synchronize the methods to ensure a memory barrier. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409239039 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java: ## @@ -47,4 +47,11 @@ default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {} * any post unloading operations. */ default void onUnloaded() {} + +/** + * Replay a record to update the state machine. + * + * @param record The record to replay. + */ +default void replay(U record) throws RuntimeException {} Review Comment: I think that this should not have a default implementation. It was not the case before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409237609 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java: ## @@ -33,4 +33,18 @@ public interface CoordinatorPlayback { * @throws RuntimeException if the record can not be applied. */ void replay(U record) throws RuntimeException; + +/** + * Invoke operations when a batch has been successfully loaded. + * + * @param offset the offset of the last record in the batch. + */ +void onBatchLoaded(Long offset); Review Comment: Should we align the naming on the method that we have in the coordinator context? It is kind of confusing if we use different terms here, I think. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java: ## @@ -33,4 +33,18 @@ public interface CoordinatorPlayback { * @throws RuntimeException if the record can not be applied. */ void replay(U record) throws RuntimeException; + +/** + * Invoke operations when a batch has been successfully loaded. + * + * @param offset the offset of the last record in the batch. + */ +void onBatchLoaded(Long offset); + +/** + * Called when the high watermark advances. + * + * @param highWatermark the new high watermark. + */ +void onHighWatermarkUpdated(Long highWatermark); Review Comment: Same question here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409232801 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -528,8 +528,8 @@ private void transitionTo( case LOADING: state = CoordinatorState.LOADING; snapshotRegistry = new SnapshotRegistry(logContext); -lastWrittenOffset = 0L; -lastCommittedOffset = 0L; +updateLastWrittenOffset(0); +updateLastCommittedOffset(0); Review Comment: I think that this won't always work. For instance, think about the case where we go from Failed to Loading. The reset won't work because `0` is not larger than the current values. It may be better to just keep what was there and to add the first snapshot creating there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409230711 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -458,9 +458,9 @@ private CoordinatorContext( private void updateLastWrittenOffset( long offset ) { -if (offset <= lastWrittenOffset) { +if (offset < lastWrittenOffset) { Review Comment: Hum... I am not sure about this because the offset should always increase. This is an important invariant of the system and by relaxing this we could potentially violate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409228279 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } + + if (currentOffset >= currentHighWatermark) { +onLoadedBatch.accept(currentOffset) + } Review Comment: Yes, I think so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1409025050 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T]( * Loads the coordinator by reading all the records from the TopicPartition * and applying them to the Replayable object. * - * @param tp The TopicPartition to read from. - * @param coordinator The object to apply records to. + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + * @param onLoadedBatch Invoked when a batch was successfully loaded. + * @param onHighWatermarkUpdated Invoked when the high watermark advanced. */ override def load( tp: TopicPartition, -coordinator: CoordinatorPlayback[T] +coordinator: CoordinatorPlayback[T], +onLoadedBatch: Consumer[java.lang.Long], +onHighWatermarkUpdated: Consumer[java.lang.Long] Review Comment: > Is it not a good practice to use consumers like this? It looks weird because all the three go together so an interface is better in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1408532337 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -543,8 +543,14 @@ private void transitionTo( case ACTIVE: state = CoordinatorState.ACTIVE; -snapshotRegistry.getOrCreateSnapshot(0); partitionWriter.registerListener(tp, highWatermarklistener); + +// If the partition did not contain any records, we would not have generated a snapshot +// while loading. +if (lastWrittenOffset == -1L) { +updateLastWrittenOffset(0); +updateLastCommittedOffset(0); +} Review Comment: i reverted this and instead created a snapshot at epoch 0 when transitioning to LOADING state. it doesn't make sense to create a snapshot at epoch 0 after loading. i also changed the exception conditions to be `>` and not `>=` since we do expect lastCommittedOffset and lastWrittenOffset to sometimes be updated to the same value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1408531602 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T]( * Loads the coordinator by reading all the records from the TopicPartition * and applying them to the Replayable object. * - * @param tp The TopicPartition to read from. - * @param coordinator The object to apply records to. + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + * @param onLoadedBatch Invoked when a batch was successfully loaded. + * @param onHighWatermarkUpdated Invoked when the high watermark advanced. */ override def load( tp: TopicPartition, -coordinator: CoordinatorPlayback[T] +coordinator: CoordinatorPlayback[T], +onLoadedBatch: Consumer[java.lang.Long], +onHighWatermarkUpdated: Consumer[java.lang.Long] Review Comment: i reflected your suggestion, let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1408090316 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -543,8 +543,14 @@ private void transitionTo( case ACTIVE: state = CoordinatorState.ACTIVE; -snapshotRegistry.getOrCreateSnapshot(0); partitionWriter.registerListener(tp, highWatermarklistener); + +// If the partition did not contain any records, we would not have generated a snapshot +// while loading. +if (lastWrittenOffset == -1L) { +updateLastWrittenOffset(0); +updateLastCommittedOffset(0); +} Review Comment: for both offsets, are you saying that we won't hit the case where we update HWM or last written offset to 0 while loading? SInce we throw an exception in both methods ``` if (offset <= lastWrittenOffset) { throw new IllegalStateException("New last written offset " + offset + " of " + tp + " must be larger than " + lastWrittenOffset + "."); } ``` Which is why I initialized them as -1L -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1408072792 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } Review Comment: yeah, that makes sense to me since updating hwm also deletes snapshots -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1408070465 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } + + if (currentOffset >= currentHighWatermark) { +onLoadedBatch.accept(currentOffset) + } Review Comment: currentOffset points to the nextOffset (the last offset in the batch plus 1) so I had this after successfully loading for an offset. Should lastWrittenOffset point to the next offset? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1408056685 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T]( * Loads the coordinator by reading all the records from the TopicPartition * and applying them to the Replayable object. * - * @param tp The TopicPartition to read from. - * @param coordinator The object to apply records to. + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + * @param onLoadedBatch Invoked when a batch was successfully loaded. + * @param onHighWatermarkUpdated Invoked when the high watermark advanced. */ override def load( tp: TopicPartition, -coordinator: CoordinatorPlayback[T] +coordinator: CoordinatorPlayback[T], +onLoadedBatch: Consumer[java.lang.Long], +onHighWatermarkUpdated: Consumer[java.lang.Long] Review Comment: i did not like the fact that we need to have a replay() method in both CoordinatorPlayback and CoordinatorShard. we cannot move replay() to just CoordinatorContext because it needs to access OffsetMetadataManager and GroupMetadataManager. If you think that is fine, then I can make the changes. Is it not a good practice to use consumers like this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
dajac commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1407405303 ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } + + if (currentOffset >= currentHighWatermark) { +onLoadedBatch.accept(currentOffset) + } Review Comment: `onLoadedBatch` updates the lastWrittenOffset so shouldn't we call this after updating `currentOffset`? ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -148,6 +156,16 @@ class CoordinatorLoaderImpl[T]( } } + val currentHighWatermark = log.highWatermark + if (currentHighWatermark > previousHighWatermark) { +onHighWatermarkUpdated.accept(currentHighWatermark) +previousHighWatermark = currentHighWatermark + } Review Comment: I wonder if we should rather do this after updating the last written offset. At least, conceptually it makes more sense. What do you think? ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -55,17 +56,21 @@ class CoordinatorLoaderImpl[T]( * Loads the coordinator by reading all the records from the TopicPartition * and applying them to the Replayable object. * - * @param tp The TopicPartition to read from. - * @param coordinator The object to apply records to. + * @param tp The TopicPartition to read from. + * @param coordinator The object to apply records to. + * @param onLoadedBatch Invoked when a batch was successfully loaded. + * @param onHighWatermarkUpdated Invoked when the high watermark advanced. */ override def load( tp: TopicPartition, -coordinator: CoordinatorPlayback[T] +coordinator: CoordinatorPlayback[T], +onLoadedBatch: Consumer[java.lang.Long], +onHighWatermarkUpdated: Consumer[java.lang.Long] Review Comment: This does not look good. Is there a reason why you did not use an interface which merges those three? In my opinion, it would be better to have an interface which defines: * replay * updateLastWrittenOffset * updateLastCommittedOffset We could re-purpose `CoordinatorPlayback` for this and introduce a new interface for the loader. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -543,8 +543,14 @@ private void transitionTo( case ACTIVE: state = CoordinatorState.ACTIVE; -snapshotRegistry.getOrCreateSnapshot(0); partitionWriter.registerListener(tp, highWatermarklistener); + +// If the partition did not contain any records, we would not have generated a snapshot +// while loading. +if (lastWrittenOffset == -1L) { +updateLastWrittenOffset(0); +updateLastCommittedOffset(0); +} Review Comment: I was wondering whether we could just keep 0 as the default values for both (In LOADING state) and create the base snapshot there. Then, we don't have to handle a special case here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org