dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412044990
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T](
}
currentOffset = batch.nextOffset
+ val currentHighWatermark = log.highWatermark
+ if (currentOffset >= currentHighWatermark) {
+ coordinator.updateLastWrittenOffset(currentOffset)
+ }
+
+ if (currentHighWatermark > previousHighWatermark) {
+ coordinator.updateLastCommittedOffset(currentHighWatermark)
+ previousHighWatermark = currentHighWatermark
+ }
Review Comment:
I think that it works. However, it is worth pointing out that the last
committed offset could be higher than the last written offset in the state
machine until records past it are read.
##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest {
}
}
+ @Test
+ def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+ val readResult1 = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult1)
+
+ val readResult2 = logReadResult(startOffset = 2, records = Seq(
+ new SimpleRecord("k3".getBytes, "v3".getBytes),
+ new SimpleRecord("k4".getBytes, "v4".getBytes),
+ new SimpleRecord("k5".getBytes, "v5".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 2L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult2)
+
+ val readResult3 = logReadResult(startOffset = 5, records = Seq(
+ new SimpleRecord("k6".getBytes, "v6".getBytes),
+ new SimpleRecord("k7".getBytes, "v7".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 5L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult3)
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+ verify(coordinator, times(0)).updateLastWrittenOffset(0)
+ verify(coordinator, times(1)).updateLastWrittenOffset(2)
+ verify(coordinator, times(1)).updateLastWrittenOffset(5)
+ verify(coordinator, times(1)).updateLastWrittenOffset(7)
+ verify(coordinator, times(1)).updateLastCommittedOffset(0)
+ verify(coordinator, times(1)).updateLastCommittedOffset(2)
+ verify(coordinator, times(0)).updateLastCommittedOffset(5)
+ }
+ }
+
+ @Test
+ def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead():
Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(log.highWatermark).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+ verify(coordinator, times(0)).updateLastWrittenOffset(0)
+ verify(coordinator, times(0)).updateLastWrittenOffset(2)
+ verify(coordinator, times(0)).updateLastWrittenOffset(5)
+ verify(coordinator, times(0)).updateLastCommittedOffset(0)
Review Comment:
Same question here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
Review Comment:
nit: Let's add an empty line before the javadoc.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
+ * The actual state machine.
+ */
+ private S coordinator;
+
+ /**
+ * The snapshot registry backing the coordinator.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The topic partition.
+ */
+ private final TopicPartition tp;
+
+ /**
+ * The last offset written to the partition.
+ */
+ private long lastWrittenOffset;
+
+ /**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+ private long lastCommittedOffset;
+
+ ContextStateMachine(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ S coordinator,
+ TopicPartition tp
+ ) {
+ this.log = logContext.logger(ContextStateMachine.class);
+ this.coordinator = coordinator;
+ this.snapshotRegistry = snapshotRegistry;
+ this.tp = tp;
+ this.lastWrittenOffset = 0;
+ this.lastCommittedOffset = 0;
+ snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.deleteSnapshotsUpTo(0);
Review Comment:
nit: This does not seem necessary.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
+ * The actual state machine.
+ */
+ private S coordinator;
+
+ /**
+ * The snapshot registry backing the coordinator.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The topic partition.
+ */
+ private final TopicPartition tp;
+
+ /**
+ * The last offset written to the partition.
+ */
+ private long lastWrittenOffset;
+
+ /**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+ private long lastCommittedOffset;
+
+ ContextStateMachine(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ S coordinator,
+ TopicPartition tp
+ ) {
+ this.log = logContext.logger(ContextStateMachine.class);
+ this.coordinator = coordinator;
+ this.snapshotRegistry = snapshotRegistry;
+ this.tp = tp;
+ this.lastWrittenOffset = 0;
+ this.lastCommittedOffset = 0;
+ snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.deleteSnapshotsUpTo(0);
+ }
+
+ /**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+ synchronized void revertLastWrittenOffset(
+ long offset
+ ) {
+ if (offset > lastWrittenOffset) {
+ throw new IllegalStateException("New offset " + offset + " of " +
tp +
+ " must be smaller than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Revert last written offset of {} to {}.", tp, offset);
+ lastWrittenOffset = offset;
+ snapshotRegistry.revertToSnapshot(offset);
+ }
+
+ @Override
Review Comment:
nit: Javadoc to be consistent with the others?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -528,25 +452,27 @@ private void transitionTo(
switch (newState) {
case LOADING:
state = CoordinatorState.LOADING;
- snapshotRegistry = new SnapshotRegistry(logContext);
- lastWrittenOffset = 0L;
- lastCommittedOffset = 0L;
- coordinator = coordinatorShardBuilderSupplier
- .get()
- .withLogContext(logContext)
- .withSnapshotRegistry(snapshotRegistry)
- .withTime(time)
- .withTimer(timer)
- .withCoordinatorMetrics(coordinatorMetrics)
- .withTopicPartition(tp)
- .build();
+ SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
+ stateMachine = new ContextStateMachine<>(
+ logContext,
+ snapshotRegistry,
+ coordinatorShardBuilderSupplier
+ .get()
+ .withLogContext(logContext)
+ .withSnapshotRegistry(snapshotRegistry)
+ .withTime(time)
+ .withTimer(timer)
+ .withCoordinatorMetrics(coordinatorMetrics)
+ .withTopicPartition(tp)
+ .build(),
+ tp
+ );
break;
case ACTIVE:
state = CoordinatorState.ACTIVE;
- snapshotRegistry.getOrCreateSnapshot(0);
partitionWriter.registerListener(tp,
highWatermarklistener);
- coordinator.onLoaded(metadataImage);
+ stateMachine.coordinator().onLoaded(metadataImage);
Review Comment:
For convenience, I wonder if we should add `onLoaded`, `onUnloaded`, and
`onNewMetadataImage` to the new class. The benefit is that the call would be
done within the sync block.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
+ * The actual state machine.
+ */
+ private S coordinator;
+
+ /**
+ * The snapshot registry backing the coordinator.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The topic partition.
+ */
+ private final TopicPartition tp;
+
+ /**
+ * The last offset written to the partition.
+ */
+ private long lastWrittenOffset;
+
+ /**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+ private long lastCommittedOffset;
+
+ ContextStateMachine(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ S coordinator,
+ TopicPartition tp
+ ) {
+ this.log = logContext.logger(ContextStateMachine.class);
+ this.coordinator = coordinator;
+ this.snapshotRegistry = snapshotRegistry;
+ this.tp = tp;
+ this.lastWrittenOffset = 0;
+ this.lastCommittedOffset = 0;
+ snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.deleteSnapshotsUpTo(0);
+ }
+
+ /**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+ synchronized void revertLastWrittenOffset(
+ long offset
+ ) {
+ if (offset > lastWrittenOffset) {
+ throw new IllegalStateException("New offset " + offset + " of " +
tp +
+ " must be smaller than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Revert last written offset of {} to {}.", tp, offset);
+ lastWrittenOffset = offset;
+ snapshotRegistry.revertToSnapshot(offset);
+ }
+
+ @Override
+ public synchronized void replay(
+ long producerId,
+ short producerEpoch,
+ U record
+ ) {
+ coordinator.replay(producerId, producerEpoch, record);
+ }
+
+ /**
+ * Updates the last written offset. This also create a new snapshot
+ * in the snapshot registry.
+ *
+ * @param offset The new last written offset.
+ */
+ @Override
+ public synchronized void updateLastWrittenOffset(Long offset) {
+ if (offset <= lastWrittenOffset) {
+ throw new IllegalStateException("New last written offset " +
offset + " of " + tp +
+ " must be greater than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Update last written offset of {} to {}.", tp, offset);
+ lastWrittenOffset = offset;
+ snapshotRegistry.getOrCreateSnapshot(offset);
+ }
+
+ /**
+ * Updates the last committed offset. This completes all the deferred
+ * events waiting on this offset. This also cleanups all the snapshots
+ * prior to this offset.
+ *
+ * @param offset The new last committed offset.
+ */
+ @Override
+ public synchronized void updateLastCommittedOffset(Long offset) {
+ if (offset < lastCommittedOffset) {
+ throw new IllegalStateException("New committed offset " + offset +
" of " + tp +
+ " must be greater than or equal to " + lastCommittedOffset +
".");
+ }
+
+ lastCommittedOffset = offset;
+ snapshotRegistry.deleteSnapshotsUpTo(offset);
+ log.debug("Update committed offset of {} to {}.", tp, offset);
+ }
+
+ /**
+ * @return The last written offset.
+ */
+ synchronized long lastWrittenOffset() {
+ return this.lastWrittenOffset;
+ }
+
+ /**
+ * @return The last committed offset.
+ */
+ synchronized long lastCommittedOffset() {
+ return this.lastCommittedOffset;
+ }
+
+ /**
+ * @return The coordinator.
+ */
+ synchronized S coordinator() {
+ return this.coordinator;
+ }
+
+ synchronized SnapshotRegistry snapshotRegistry() {
+ return this.snapshotRegistry;
+ }
Review Comment:
Is this used anywhere?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1266,18 +1284,18 @@ public void testScheduleTimer() throws
InterruptedException {
// Check initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
- assertEquals(0, ctx.lastWrittenOffset);
- assertEquals(0, ctx.lastCommittedOffset);
+ assertEquals(0, ctx.stateMachine.lastWrittenOffset());
+ assertEquals(0, ctx.stateMachine.lastCommittedOffset());
// The coordinator timer should be empty.
assertEquals(0, ctx.timer.size());
// Timer #1.
- ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
true,
+ ctx.stateMachine.coordinator().timer.schedule("timer-1", 10,
TimeUnit.MILLISECONDS, true,
Review Comment:
Is this actually correct? I thought that the timer is still in the
context... There are other cases in this file to verify.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -745,7 +670,7 @@ public void run() {
// Apply the records to the state machine.
if (result.replayRecords()) {
result.records().forEach(record ->
- context.coordinator.replay(
+ context.stateMachine.coordinator().replay(
Review Comment:
nit: Should we use `context.stateMachine.replay`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
Review Comment:
I think that we could do slightly better for the documentation. The object
is indeed never accessed concurrently however it is accessed by multiple
threads. Therefore, we need to sync'ed the access.
I would also remove the reference to the `__consumer_offsets` because the
runtime is not tight to it. We can explain the same without mentioning it.
##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest {
}
}
+ @Test
+ def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+ val readResult1 = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult1)
+
+ val readResult2 = logReadResult(startOffset = 2, records = Seq(
+ new SimpleRecord("k3".getBytes, "v3".getBytes),
+ new SimpleRecord("k4".getBytes, "v4".getBytes),
+ new SimpleRecord("k5".getBytes, "v5".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 2L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult2)
+
+ val readResult3 = logReadResult(startOffset = 5, records = Seq(
+ new SimpleRecord("k6".getBytes, "v6".getBytes),
+ new SimpleRecord("k7".getBytes, "v7".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 5L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult3)
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+ verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+ verify(coordinator, times(0)).updateLastWrittenOffset(0)
+ verify(coordinator, times(1)).updateLastWrittenOffset(2)
+ verify(coordinator, times(1)).updateLastWrittenOffset(5)
+ verify(coordinator, times(1)).updateLastWrittenOffset(7)
+ verify(coordinator, times(1)).updateLastCommittedOffset(0)
+ verify(coordinator, times(1)).updateLastCommittedOffset(2)
+ verify(coordinator, times(0)).updateLastCommittedOffset(5)
+ }
+ }
+
+ @Test
+ def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead():
Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(log.highWatermark).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
+
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+ verify(coordinator, times(0)).updateLastWrittenOffset(0)
Review Comment:
Should we rather use `verify(coordinator,
times(0)).updateLastWrittenOffset(anyLong())` instead of verifying 0, 2 and 5?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
+ * The actual state machine.
+ */
+ private S coordinator;
+
+ /**
+ * The snapshot registry backing the coordinator.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The topic partition.
+ */
+ private final TopicPartition tp;
Review Comment:
nit: We usually declare the final attributes and then the others. Could we
rearrange them?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
Review Comment:
nit: I am not a big fan of this name. How about `SnapshottableCoordinator`
or something around those lines?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
+ * The actual state machine.
+ */
+ private S coordinator;
+
+ /**
+ * The snapshot registry backing the coordinator.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The topic partition.
+ */
+ private final TopicPartition tp;
+
+ /**
+ * The last offset written to the partition.
+ */
+ private long lastWrittenOffset;
+
+ /**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+ private long lastCommittedOffset;
+
+ ContextStateMachine(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ S coordinator,
+ TopicPartition tp
+ ) {
+ this.log = logContext.logger(ContextStateMachine.class);
+ this.coordinator = coordinator;
+ this.snapshotRegistry = snapshotRegistry;
+ this.tp = tp;
+ this.lastWrittenOffset = 0;
+ this.lastCommittedOffset = 0;
+ snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.deleteSnapshotsUpTo(0);
+ }
+
+ /**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+ synchronized void revertLastWrittenOffset(
+ long offset
+ ) {
+ if (offset > lastWrittenOffset) {
+ throw new IllegalStateException("New offset " + offset + " of " +
tp +
+ " must be smaller than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Revert last written offset of {} to {}.", tp, offset);
+ lastWrittenOffset = offset;
+ snapshotRegistry.revertToSnapshot(offset);
+ }
+
+ @Override
+ public synchronized void replay(
+ long producerId,
+ short producerEpoch,
+ U record
+ ) {
+ coordinator.replay(producerId, producerEpoch, record);
+ }
+
+ /**
+ * Updates the last written offset. This also create a new snapshot
+ * in the snapshot registry.
+ *
+ * @param offset The new last written offset.
+ */
+ @Override
+ public synchronized void updateLastWrittenOffset(Long offset) {
+ if (offset <= lastWrittenOffset) {
+ throw new IllegalStateException("New last written offset " +
offset + " of " + tp +
+ " must be greater than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Update last written offset of {} to {}.", tp, offset);
Review Comment:
nit: I have noticed that we log before updating here while we log after
updating in `updateLastCommittedOffset`. It would be good to be consistent. If
you put them after, I would also use `Updated` to signal that it was done.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
Review Comment:
Should we also add a few unit tests for this class?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -906,8 +831,8 @@ public void run() {
withActiveContextOrThrow(tp, context -> {
// Execute the read operation.
response = op.generateResponse(
- context.coordinator,
- context.lastCommittedOffset
+ context.stateMachine.coordinator(),
Review Comment:
This is where I am not fully satisfied because we actually use the
coordinator without holding the lock. I think that it is not a real issue
because it is protected by the lock of the context but it is inconsistent. I
suppose that I can live with it for now.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -322,7 +335,7 @@ public void testScheduleLoading() {
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new
CompletableFuture<>();
- when(loader.load(TP, coordinator)).thenReturn(future);
+ when(loader.load(eq(TP), any())).thenReturn(future);
Review Comment:
nit: Could we get the context earlier and use `ctx.stateMachine` here? If
this is possible, we should update all of them.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -409,25 +409,10 @@ class CoordinatorContext {
int epoch;
/**
- * The snapshot registry backing the coordinator.
+ * The state machine and the metadata that can be accessed by
+ * other threads.
*/
- SnapshotRegistry snapshotRegistry;
-
- /**
- * The actual state machine.
- */
- S coordinator;
-
- /**
- * The last offset written to the partition.
- */
- long lastWrittenOffset;
-
- /**
- * The last offset committed. This represents the high
- * watermark of the partition.
- */
- long lastCommittedOffset;
+ ContextStateMachine<S, U> stateMachine;
Review Comment:
nit: If you change the name of the class, we should adapt the name here as
well.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine.
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements
CoordinatorPlayback<U> {
+ /**
+ * The logger.
+ */
+ private final Logger log;
+ /**
+ * The actual state machine.
+ */
+ private S coordinator;
+
+ /**
+ * The snapshot registry backing the coordinator.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The topic partition.
+ */
+ private final TopicPartition tp;
+
+ /**
+ * The last offset written to the partition.
+ */
+ private long lastWrittenOffset;
+
+ /**
+ * The last offset committed. This represents the high
+ * watermark of the partition.
+ */
+ private long lastCommittedOffset;
+
+ ContextStateMachine(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ S coordinator,
+ TopicPartition tp
+ ) {
+ this.log = logContext.logger(ContextStateMachine.class);
+ this.coordinator = coordinator;
+ this.snapshotRegistry = snapshotRegistry;
+ this.tp = tp;
+ this.lastWrittenOffset = 0;
+ this.lastCommittedOffset = 0;
+ snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.deleteSnapshotsUpTo(0);
+ }
+
+ /**
+ * Reverts the last written offset. This also reverts the snapshot
+ * registry to this offset. All the changes applied after the offset
+ * are lost.
+ *
+ * @param offset The offset to revert to.
+ */
+ synchronized void revertLastWrittenOffset(
+ long offset
+ ) {
+ if (offset > lastWrittenOffset) {
+ throw new IllegalStateException("New offset " + offset + " of " +
tp +
+ " must be smaller than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Revert last written offset of {} to {}.", tp, offset);
+ lastWrittenOffset = offset;
+ snapshotRegistry.revertToSnapshot(offset);
+ }
+
+ @Override
+ public synchronized void replay(
+ long producerId,
+ short producerEpoch,
+ U record
+ ) {
+ coordinator.replay(producerId, producerEpoch, record);
+ }
+
+ /**
+ * Updates the last written offset. This also create a new snapshot
+ * in the snapshot registry.
+ *
+ * @param offset The new last written offset.
+ */
+ @Override
+ public synchronized void updateLastWrittenOffset(Long offset) {
+ if (offset <= lastWrittenOffset) {
+ throw new IllegalStateException("New last written offset " +
offset + " of " + tp +
+ " must be greater than " + lastWrittenOffset + ".");
+ }
+
+ log.debug("Update last written offset of {} to {}.", tp, offset);
+ lastWrittenOffset = offset;
+ snapshotRegistry.getOrCreateSnapshot(offset);
+ }
+
+ /**
+ * Updates the last committed offset. This completes all the deferred
+ * events waiting on this offset. This also cleanups all the snapshots
+ * prior to this offset.
+ *
+ * @param offset The new last committed offset.
+ */
+ @Override
+ public synchronized void updateLastCommittedOffset(Long offset) {
+ if (offset < lastCommittedOffset) {
+ throw new IllegalStateException("New committed offset " + offset +
" of " + tp +
+ " must be greater than or equal to " + lastCommittedOffset +
".");
+ }
+
+ lastCommittedOffset = offset;
+ snapshotRegistry.deleteSnapshotsUpTo(offset);
+ log.debug("Update committed offset of {} to {}.", tp, offset);
+ }
+
+ /**
+ * @return The last written offset.
+ */
+ synchronized long lastWrittenOffset() {
+ return this.lastWrittenOffset;
+ }
+
+ /**
+ * @return The last committed offset.
+ */
+ synchronized long lastCommittedOffset() {
+ return this.lastCommittedOffset;
+ }
+
+ /**
+ * @return The coordinator.
+ */
+ synchronized S coordinator() {
+ return this.coordinator;
+ }
+
+ synchronized SnapshotRegistry snapshotRegistry() {
+ return this.snapshotRegistry;
+ }
+
+ /**
+ * Set the coordinator state machine.
+ *
+ * @param coordinator The coordinator.
+ *
+ * Only used for testing.
+ */
+ synchronized void setCoordinator(S coordinator) {
+ this.coordinator = coordinator;
+ }
Review Comment:
I see that you use this in tests. I wonder if we could recreate this object
instead of altering it. Would it be possible?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1659,4 +1680,117 @@ public void testPartitionLoadSensor() {
verify(runtimeMetrics,
times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
}
+
+ @Test
+ public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorShardBuilderSupplier supplier =
mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder =
mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ GroupCoordinatorRuntimeMetrics runtimeMetrics =
mock(GroupCoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(Time.SYSTEM)
+ .withTimer(timer)
+ .withLoader(new MockCoordinatorLoader(
+ new CoordinatorLoader.LoadSummary(
+ 1000,
+ 2000,
+ 30,
+ 3000),
+ Arrays.asList(5L, 15L, 27L),
+ Arrays.asList(5L, 15L)))
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+
+ // Getting the coordinator context fails because the coordinator
+ // does not exist until scheduleLoadOperation is called.
+ assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 0);
+
+ // Getting the coordinator context succeeds now.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+
+ // When the loading completes, the coordinator transitions to active.
+ assertEquals(ACTIVE, ctx.state);
+
+ assertEquals(27L, ctx.stateMachine.lastWrittenOffset());
+ assertEquals(15L, ctx.stateMachine.lastCommittedOffset());
+ assertFalse(ctx.stateMachine.snapshotRegistry().hasSnapshot(0L));
Review Comment:
I suppose that this does not work because the snapshot registry is not in
the context anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]