dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1412044990


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -153,6 +154,15 @@ class CoordinatorLoaderImpl[T](
               }
 
               currentOffset = batch.nextOffset
+              val currentHighWatermark = log.highWatermark
+              if (currentOffset >= currentHighWatermark) {
+                coordinator.updateLastWrittenOffset(currentOffset)
+              }
+
+              if (currentHighWatermark > previousHighWatermark) {
+                coordinator.updateLastCommittedOffset(currentHighWatermark)
+                previousHighWatermark = currentHighWatermark
+              }

Review Comment:
   I think that it works. However, it is worth pointing out that the last 
committed offset could be higher than the last written offset in the state 
machine until records past it are read.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest {
     }
   }
 
+  @Test
+  def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+      val readResult1 = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult1)
+
+      val readResult2 = logReadResult(startOffset = 2, records = Seq(
+        new SimpleRecord("k3".getBytes, "v3".getBytes),
+        new SimpleRecord("k4".getBytes, "v4".getBytes),
+        new SimpleRecord("k5".getBytes, "v5".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 2L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult2)
+
+      val readResult3 = logReadResult(startOffset = 5, records = Seq(
+        new SimpleRecord("k6".getBytes, "v6".getBytes),
+        new SimpleRecord("k7".getBytes, "v7".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 5L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult3)
+
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+      verify(coordinator, times(0)).updateLastWrittenOffset(0)
+      verify(coordinator, times(1)).updateLastWrittenOffset(2)
+      verify(coordinator, times(1)).updateLastWrittenOffset(5)
+      verify(coordinator, times(1)).updateLastWrittenOffset(7)
+      verify(coordinator, times(1)).updateLastCommittedOffset(0)
+      verify(coordinator, times(1)).updateLastCommittedOffset(2)
+      verify(coordinator, times(0)).updateLastCommittedOffset(5)
+    }
+  }
+
+  @Test
+  def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): 
Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(log.highWatermark).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
+
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+      verify(coordinator, times(0)).updateLastWrittenOffset(0)
+      verify(coordinator, times(0)).updateLastWrittenOffset(2)
+      verify(coordinator, times(0)).updateLastWrittenOffset(5)
+      verify(coordinator, times(0)).updateLastCommittedOffset(0)

Review Comment:
   Same question here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**

Review Comment:
   nit: Let's add an empty line before the javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**
+     * The actual state machine.
+     */
+    private S coordinator;
+
+    /**
+     * The snapshot registry backing the coordinator.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The topic partition.
+     */
+    private final TopicPartition tp;
+
+    /**
+     * The last offset written to the partition.
+     */
+    private long lastWrittenOffset;
+
+    /**
+     * The last offset committed. This represents the high
+     * watermark of the partition.
+     */
+    private long lastCommittedOffset;
+
+    ContextStateMachine(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        S coordinator,
+        TopicPartition tp
+    ) {
+        this.log = logContext.logger(ContextStateMachine.class);
+        this.coordinator = coordinator;
+        this.snapshotRegistry = snapshotRegistry;
+        this.tp = tp;
+        this.lastWrittenOffset = 0;
+        this.lastCommittedOffset = 0;
+        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.deleteSnapshotsUpTo(0);

Review Comment:
   nit: This does not seem necessary.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**
+     * The actual state machine.
+     */
+    private S coordinator;
+
+    /**
+     * The snapshot registry backing the coordinator.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The topic partition.
+     */
+    private final TopicPartition tp;
+
+    /**
+     * The last offset written to the partition.
+     */
+    private long lastWrittenOffset;
+
+    /**
+     * The last offset committed. This represents the high
+     * watermark of the partition.
+     */
+    private long lastCommittedOffset;
+
+    ContextStateMachine(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        S coordinator,
+        TopicPartition tp
+    ) {
+        this.log = logContext.logger(ContextStateMachine.class);
+        this.coordinator = coordinator;
+        this.snapshotRegistry = snapshotRegistry;
+        this.tp = tp;
+        this.lastWrittenOffset = 0;
+        this.lastCommittedOffset = 0;
+        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.deleteSnapshotsUpTo(0);
+    }
+
+    /**
+     * Reverts the last written offset. This also reverts the snapshot
+     * registry to this offset. All the changes applied after the offset
+     * are lost.
+     *
+     * @param offset The offset to revert to.
+     */
+    synchronized void revertLastWrittenOffset(
+        long offset
+    ) {
+        if (offset > lastWrittenOffset) {
+            throw new IllegalStateException("New offset " + offset + " of " + 
tp +
+                " must be smaller than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Revert last written offset of {} to {}.", tp, offset);
+        lastWrittenOffset = offset;
+        snapshotRegistry.revertToSnapshot(offset);
+    }
+
+    @Override

Review Comment:
   nit: Javadoc to be consistent with the others?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -528,25 +452,27 @@ private void transitionTo(
             switch (newState) {
                 case LOADING:
                     state = CoordinatorState.LOADING;
-                    snapshotRegistry = new SnapshotRegistry(logContext);
-                    lastWrittenOffset = 0L;
-                    lastCommittedOffset = 0L;
-                    coordinator = coordinatorShardBuilderSupplier
-                        .get()
-                        .withLogContext(logContext)
-                        .withSnapshotRegistry(snapshotRegistry)
-                        .withTime(time)
-                        .withTimer(timer)
-                        .withCoordinatorMetrics(coordinatorMetrics)
-                        .withTopicPartition(tp)
-                        .build();
+                    SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+                    stateMachine = new ContextStateMachine<>(
+                        logContext,
+                        snapshotRegistry,
+                        coordinatorShardBuilderSupplier
+                            .get()
+                            .withLogContext(logContext)
+                            .withSnapshotRegistry(snapshotRegistry)
+                            .withTime(time)
+                            .withTimer(timer)
+                            .withCoordinatorMetrics(coordinatorMetrics)
+                            .withTopicPartition(tp)
+                            .build(),
+                        tp
+                    );
                     break;
 
                 case ACTIVE:
                     state = CoordinatorState.ACTIVE;
-                    snapshotRegistry.getOrCreateSnapshot(0);
                     partitionWriter.registerListener(tp, 
highWatermarklistener);
-                    coordinator.onLoaded(metadataImage);
+                    stateMachine.coordinator().onLoaded(metadataImage);

Review Comment:
   For convenience, I wonder if we should add `onLoaded`, `onUnloaded`, and 
`onNewMetadataImage` to the new class. The benefit is that the call would be 
done within the sync block.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**
+     * The actual state machine.
+     */
+    private S coordinator;
+
+    /**
+     * The snapshot registry backing the coordinator.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The topic partition.
+     */
+    private final TopicPartition tp;
+
+    /**
+     * The last offset written to the partition.
+     */
+    private long lastWrittenOffset;
+
+    /**
+     * The last offset committed. This represents the high
+     * watermark of the partition.
+     */
+    private long lastCommittedOffset;
+
+    ContextStateMachine(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        S coordinator,
+        TopicPartition tp
+    ) {
+        this.log = logContext.logger(ContextStateMachine.class);
+        this.coordinator = coordinator;
+        this.snapshotRegistry = snapshotRegistry;
+        this.tp = tp;
+        this.lastWrittenOffset = 0;
+        this.lastCommittedOffset = 0;
+        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.deleteSnapshotsUpTo(0);
+    }
+
+    /**
+     * Reverts the last written offset. This also reverts the snapshot
+     * registry to this offset. All the changes applied after the offset
+     * are lost.
+     *
+     * @param offset The offset to revert to.
+     */
+    synchronized void revertLastWrittenOffset(
+        long offset
+    ) {
+        if (offset > lastWrittenOffset) {
+            throw new IllegalStateException("New offset " + offset + " of " + 
tp +
+                " must be smaller than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Revert last written offset of {} to {}.", tp, offset);
+        lastWrittenOffset = offset;
+        snapshotRegistry.revertToSnapshot(offset);
+    }
+
+    @Override
+    public synchronized void replay(
+        long producerId,
+        short producerEpoch,
+        U record
+    ) {
+        coordinator.replay(producerId, producerEpoch, record);
+    }
+
+    /**
+     * Updates the last written offset. This also create a new snapshot
+     * in the snapshot registry.
+     *
+     * @param offset The new last written offset.
+     */
+    @Override
+    public synchronized void updateLastWrittenOffset(Long offset) {
+        if (offset <= lastWrittenOffset) {
+            throw new IllegalStateException("New last written offset " + 
offset + " of " + tp +
+                " must be greater than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Update last written offset of {} to {}.", tp, offset);
+        lastWrittenOffset = offset;
+        snapshotRegistry.getOrCreateSnapshot(offset);
+    }
+
+    /**
+     * Updates the last committed offset. This completes all the deferred
+     * events waiting on this offset. This also cleanups all the snapshots
+     * prior to this offset.
+     *
+     * @param offset The new last committed offset.
+     */
+    @Override
+    public synchronized void updateLastCommittedOffset(Long offset) {
+        if (offset < lastCommittedOffset) {
+            throw new IllegalStateException("New committed offset " + offset + 
" of " + tp +
+                " must be greater than or equal to " + lastCommittedOffset + 
".");
+        }
+
+        lastCommittedOffset = offset;
+        snapshotRegistry.deleteSnapshotsUpTo(offset);
+        log.debug("Update committed offset of {} to {}.", tp, offset);
+    }
+
+    /**
+     * @return The last written offset.
+     */
+    synchronized long lastWrittenOffset() {
+        return this.lastWrittenOffset;
+    }
+
+    /**
+     * @return The last committed offset.
+     */
+    synchronized long lastCommittedOffset() {
+        return this.lastCommittedOffset;
+    }
+
+    /**
+     * @return The coordinator.
+     */
+    synchronized S coordinator() {
+        return this.coordinator;
+    }
+
+    synchronized SnapshotRegistry snapshotRegistry() {
+        return this.snapshotRegistry;
+    }

Review Comment:
   Is this used anywhere?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1266,18 +1284,18 @@ public void testScheduleTimer() throws 
InterruptedException {
 
         // Check initial state.
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
-        assertEquals(0, ctx.lastWrittenOffset);
-        assertEquals(0, ctx.lastCommittedOffset);
+        assertEquals(0, ctx.stateMachine.lastWrittenOffset());
+        assertEquals(0, ctx.stateMachine.lastCommittedOffset());
 
         // The coordinator timer should be empty.
         assertEquals(0, ctx.timer.size());
 
         // Timer #1.
-        ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, 
true,
+        ctx.stateMachine.coordinator().timer.schedule("timer-1", 10, 
TimeUnit.MILLISECONDS, true,

Review Comment:
   Is this actually correct? I thought that the timer is still in the 
context... There are other cases in this file to verify.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -745,7 +670,7 @@ public void run() {
                             // Apply the records to the state machine.
                             if (result.replayRecords()) {
                                 result.records().forEach(record ->
-                                    context.coordinator.replay(
+                                    context.stateMachine.coordinator().replay(

Review Comment:
   nit: Should we use `context.stateMachine.replay`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.

Review Comment:
   I think that we could do slightly better for the documentation. The object 
is indeed never accessed concurrently however it is accessed by multiple 
threads. Therefore, we need to sync'ed the access.
   
   I would also remove the reference to the `__consumer_offsets` because the 
runtime is not tight to it. We can explain the same without mentioning it.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##########
@@ -366,6 +370,110 @@ class CoordinatorLoaderImplTest {
     }
   }
 
+  @Test
+  def testUpdateLastWrittenOffsetOnBatchLoaded(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(log.highWatermark).thenReturn(0L).thenReturn(0L).thenReturn(2L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L))
+
+      val readResult1 = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult1)
+
+      val readResult2 = logReadResult(startOffset = 2, records = Seq(
+        new SimpleRecord("k3".getBytes, "v3".getBytes),
+        new SimpleRecord("k4".getBytes, "v4".getBytes),
+        new SimpleRecord("k5".getBytes, "v5".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 2L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult2)
+
+      val readResult3 = logReadResult(startOffset = 5, records = Seq(
+        new SimpleRecord("k6".getBytes, "v6".getBytes),
+        new SimpleRecord("k7".getBytes, "v7".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 5L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult3)
+
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+      verify(coordinator, times(0)).updateLastWrittenOffset(0)
+      verify(coordinator, times(1)).updateLastWrittenOffset(2)
+      verify(coordinator, times(1)).updateLastWrittenOffset(5)
+      verify(coordinator, times(1)).updateLastWrittenOffset(7)
+      verify(coordinator, times(1)).updateLastCommittedOffset(0)
+      verify(coordinator, times(1)).updateLastCommittedOffset(2)
+      verify(coordinator, times(0)).updateLastCommittedOffset(5)
+    }
+  }
+
+  @Test
+  def testUpdateLastWrittenOffsetAndUpdateLastCommittedOffsetNoRecordsRead(): 
Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(log.highWatermark).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(0L))
+
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+
+      verify(coordinator, times(0)).updateLastWrittenOffset(0)

Review Comment:
   Should we rather use `verify(coordinator, 
times(0)).updateLastWrittenOffset(anyLong())` instead of verifying 0, 2 and 5?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**
+     * The actual state machine.
+     */
+    private S coordinator;
+
+    /**
+     * The snapshot registry backing the coordinator.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The topic partition.
+     */
+    private final TopicPartition tp;

Review Comment:
   nit: We usually declare the final attributes and then the others. Could we 
rearrange them?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {

Review Comment:
   nit: I am not a big fan of this name. How about `SnapshottableCoordinator` 
or something around those lines?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**
+     * The actual state machine.
+     */
+    private S coordinator;
+
+    /**
+     * The snapshot registry backing the coordinator.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The topic partition.
+     */
+    private final TopicPartition tp;
+
+    /**
+     * The last offset written to the partition.
+     */
+    private long lastWrittenOffset;
+
+    /**
+     * The last offset committed. This represents the high
+     * watermark of the partition.
+     */
+    private long lastCommittedOffset;
+
+    ContextStateMachine(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        S coordinator,
+        TopicPartition tp
+    ) {
+        this.log = logContext.logger(ContextStateMachine.class);
+        this.coordinator = coordinator;
+        this.snapshotRegistry = snapshotRegistry;
+        this.tp = tp;
+        this.lastWrittenOffset = 0;
+        this.lastCommittedOffset = 0;
+        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.deleteSnapshotsUpTo(0);
+    }
+
+    /**
+     * Reverts the last written offset. This also reverts the snapshot
+     * registry to this offset. All the changes applied after the offset
+     * are lost.
+     *
+     * @param offset The offset to revert to.
+     */
+    synchronized void revertLastWrittenOffset(
+        long offset
+    ) {
+        if (offset > lastWrittenOffset) {
+            throw new IllegalStateException("New offset " + offset + " of " + 
tp +
+                " must be smaller than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Revert last written offset of {} to {}.", tp, offset);
+        lastWrittenOffset = offset;
+        snapshotRegistry.revertToSnapshot(offset);
+    }
+
+    @Override
+    public synchronized void replay(
+        long producerId,
+        short producerEpoch,
+        U record
+    ) {
+        coordinator.replay(producerId, producerEpoch, record);
+    }
+
+    /**
+     * Updates the last written offset. This also create a new snapshot
+     * in the snapshot registry.
+     *
+     * @param offset The new last written offset.
+     */
+    @Override
+    public synchronized void updateLastWrittenOffset(Long offset) {
+        if (offset <= lastWrittenOffset) {
+            throw new IllegalStateException("New last written offset " + 
offset + " of " + tp +
+                " must be greater than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Update last written offset of {} to {}.", tp, offset);

Review Comment:
   nit: I have noticed that we log before updating here while we log after 
updating in `updateLastCommittedOffset`. It would be good to be consistent. If 
you put them after, I would also use `Updated` to signal that it was done.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*

Review Comment:
   Should we also add a few unit tests for this class?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -906,8 +831,8 @@ public void run() {
                 withActiveContextOrThrow(tp, context -> {
                     // Execute the read operation.
                     response = op.generateResponse(
-                        context.coordinator,
-                        context.lastCommittedOffset
+                        context.stateMachine.coordinator(),

Review Comment:
   This is where I am not fully satisfied because we actually use the 
coordinator without holding the lock. I think that it is not a real issue 
because it is protected by the lock of the context but it is inconsistent. I 
suppose that I can live with it for now.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -322,7 +335,7 @@ public void testScheduleLoading() {
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
         CompletableFuture<CoordinatorLoader.LoadSummary> future = new 
CompletableFuture<>();
-        when(loader.load(TP, coordinator)).thenReturn(future);
+        when(loader.load(eq(TP), any())).thenReturn(future);

Review Comment:
   nit: Could we get the context earlier and use `ctx.stateMachine` here? If 
this is possible, we should update all of them.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -409,25 +409,10 @@ class CoordinatorContext {
         int epoch;
 
         /**
-         * The snapshot registry backing the coordinator.
+         * The state machine and the metadata that can be accessed by
+         * other threads.
          */
-        SnapshotRegistry snapshotRegistry;
-
-        /**
-         * The actual state machine.
-         */
-        S coordinator;
-
-        /**
-         * The last offset written to the partition.
-         */
-        long lastWrittenOffset;
-
-        /**
-         * The last offset committed. This represents the high
-         * watermark of the partition.
-         */
-        long lastCommittedOffset;
+        ContextStateMachine<S, U> stateMachine;

Review Comment:
   nit: If you change the name of the class, we should adapt the name here as 
well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+/**
+ * ContextStateMachine is a wrapper on top of the coordinator state machine. 
Generally, only a single thread at a
+ * time will access this object but multiple threads may access while loading 
the __consumer_offsets topic partition.
+ */
+class ContextStateMachine<S extends CoordinatorShard<U>, U> implements 
CoordinatorPlayback<U> {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+    /**
+     * The actual state machine.
+     */
+    private S coordinator;
+
+    /**
+     * The snapshot registry backing the coordinator.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The topic partition.
+     */
+    private final TopicPartition tp;
+
+    /**
+     * The last offset written to the partition.
+     */
+    private long lastWrittenOffset;
+
+    /**
+     * The last offset committed. This represents the high
+     * watermark of the partition.
+     */
+    private long lastCommittedOffset;
+
+    ContextStateMachine(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        S coordinator,
+        TopicPartition tp
+    ) {
+        this.log = logContext.logger(ContextStateMachine.class);
+        this.coordinator = coordinator;
+        this.snapshotRegistry = snapshotRegistry;
+        this.tp = tp;
+        this.lastWrittenOffset = 0;
+        this.lastCommittedOffset = 0;
+        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.deleteSnapshotsUpTo(0);
+    }
+
+    /**
+     * Reverts the last written offset. This also reverts the snapshot
+     * registry to this offset. All the changes applied after the offset
+     * are lost.
+     *
+     * @param offset The offset to revert to.
+     */
+    synchronized void revertLastWrittenOffset(
+        long offset
+    ) {
+        if (offset > lastWrittenOffset) {
+            throw new IllegalStateException("New offset " + offset + " of " + 
tp +
+                " must be smaller than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Revert last written offset of {} to {}.", tp, offset);
+        lastWrittenOffset = offset;
+        snapshotRegistry.revertToSnapshot(offset);
+    }
+
+    @Override
+    public synchronized void replay(
+        long producerId,
+        short producerEpoch,
+        U record
+    ) {
+        coordinator.replay(producerId, producerEpoch, record);
+    }
+
+    /**
+     * Updates the last written offset. This also create a new snapshot
+     * in the snapshot registry.
+     *
+     * @param offset The new last written offset.
+     */
+    @Override
+    public synchronized void updateLastWrittenOffset(Long offset) {
+        if (offset <= lastWrittenOffset) {
+            throw new IllegalStateException("New last written offset " + 
offset + " of " + tp +
+                " must be greater than " + lastWrittenOffset + ".");
+        }
+
+        log.debug("Update last written offset of {} to {}.", tp, offset);
+        lastWrittenOffset = offset;
+        snapshotRegistry.getOrCreateSnapshot(offset);
+    }
+
+    /**
+     * Updates the last committed offset. This completes all the deferred
+     * events waiting on this offset. This also cleanups all the snapshots
+     * prior to this offset.
+     *
+     * @param offset The new last committed offset.
+     */
+    @Override
+    public synchronized void updateLastCommittedOffset(Long offset) {
+        if (offset < lastCommittedOffset) {
+            throw new IllegalStateException("New committed offset " + offset + 
" of " + tp +
+                " must be greater than or equal to " + lastCommittedOffset + 
".");
+        }
+
+        lastCommittedOffset = offset;
+        snapshotRegistry.deleteSnapshotsUpTo(offset);
+        log.debug("Update committed offset of {} to {}.", tp, offset);
+    }
+
+    /**
+     * @return The last written offset.
+     */
+    synchronized long lastWrittenOffset() {
+        return this.lastWrittenOffset;
+    }
+
+    /**
+     * @return The last committed offset.
+     */
+    synchronized long lastCommittedOffset() {
+        return this.lastCommittedOffset;
+    }
+
+    /**
+     * @return The coordinator.
+     */
+    synchronized S coordinator() {
+        return this.coordinator;
+    }
+
+    synchronized SnapshotRegistry snapshotRegistry() {
+        return this.snapshotRegistry;
+    }
+
+    /**
+     * Set the coordinator state machine.
+     *
+     * @param coordinator      The coordinator.
+     *
+     * Only used for testing.
+     */
+    synchronized void setCoordinator(S coordinator) {
+        this.coordinator = coordinator;
+    }

Review Comment:
   I see that you use this in tests. I wonder if we could recreate this object 
instead of altering it. Would it be possible?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -1659,4 +1680,117 @@ public void testPartitionLoadSensor() {
 
         verify(runtimeMetrics, 
times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
     }
+
+    @Test
+    public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(Time.SYSTEM)
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader(
+                    new CoordinatorLoader.LoadSummary(
+                        1000,
+                        2000,
+                        30,
+                        3000),
+                    Arrays.asList(5L, 15L, 27L),
+                    Arrays.asList(5L, 15L)))
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Getting the coordinator context fails because the coordinator
+        // does not exist until scheduleLoadOperation is called.
+        assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 0);
+
+        // Getting the coordinator context succeeds now.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+        // When the loading completes, the coordinator transitions to active.
+        assertEquals(ACTIVE, ctx.state);
+
+        assertEquals(27L, ctx.stateMachine.lastWrittenOffset());
+        assertEquals(15L, ctx.stateMachine.lastCommittedOffset());
+        assertFalse(ctx.stateMachine.snapshotRegistry().hasSnapshot(0L));

Review Comment:
   I suppose that this does not work because the snapshot registry is not in 
the context anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to