dajac commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1410820590


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -370,6 +371,155 @@ public int size() {
      * CoordinatorContext holds all the metadata around a coordinator state 
machine.
      */
     class CoordinatorContext {
+
+        /**
+         * ContextStateMachine is a wrapper on top of the coordinator state 
machine. The methods
+         * may be invoked by multiple threads, so they are synchronized.
+         */
+        class ContextStateMachine implements CoordinatorPlayback<U> {

Review Comment:
   I think that we are pushing the nested classes a bit too far here and it 
makes it hard to reason about the concurrency for this class because we access 
attributes outside of it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -370,6 +371,155 @@ public int size() {
      * CoordinatorContext holds all the metadata around a coordinator state 
machine.
      */
     class CoordinatorContext {
+
+        /**
+         * ContextStateMachine is a wrapper on top of the coordinator state 
machine. The methods
+         * may be invoked by multiple threads, so they are synchronized.
+         */
+        class ContextStateMachine implements CoordinatorPlayback<U> {
+
+            /**
+             * The actual state machine.
+             */
+            private S coordinator;
+
+            /**
+             * The snapshot registry backing the coordinator.
+             */
+            private final SnapshotRegistry snapshotRegistry;
+
+            /**
+             * The last offset written to the partition.
+             */
+            private long lastWrittenOffset;
+
+            /**
+             * The last offset committed. This represents the high
+             * watermark of the partition.
+             */
+            private long lastCommittedOffset;
+
+            ContextStateMachine(
+                SnapshotRegistry snapshotRegistry,
+                S coordinator, 
+                long lastWrittenOffset, 
+                long lastCommittedOffset
+            ) {
+                this.coordinator = coordinator;
+                this.snapshotRegistry = snapshotRegistry;
+                this.lastWrittenOffset = lastWrittenOffset;
+                this.lastCommittedOffset = lastCommittedOffset;
+                snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+                snapshotRegistry.deleteSnapshotsUpTo(lastWrittenOffset);
+            }
+
+            /**
+             * Reverts the last written offset. This also reverts the snapshot
+             * registry to this offset. All the changes applied after the 
offset
+             * are lost.
+             *
+             * @param offset The offset to revert to.
+             */
+            private synchronized void revertLastWrittenOffset(
+                long offset
+            ) {
+                if (offset > lastWrittenOffset) {
+                    throw new IllegalStateException("New offset " + offset + " 
of " + tp +
+                        " must be smaller than " + lastWrittenOffset + ".");
+                }
+
+                log.debug("Revert last written offset of {} to {}.", tp, 
offset);
+                lastWrittenOffset = offset;
+                snapshotRegistry.revertToSnapshot(offset);
+            }
+
+            @Override
+            public void replay(
+                long producerId,
+                short producerEpoch,
+                U record
+            ) {
+                coordinator.replay(producerId, producerEpoch, record);
+            }
+
+            /**
+             * Updates the last written offset. This also create a new snapshot
+             * in the snapshot registry.
+             *
+             * @param offset The new last written offset.
+             */
+            @Override
+            public synchronized void updateLastWrittenOffset(Long offset) {
+                if (offset <= lastWrittenOffset) {
+                    throw new IllegalStateException("New last written offset " 
+ offset + " of " + tp +
+                        " must be greater than " + lastWrittenOffset + ".");
+                }
+
+                log.debug("Update last written offset of {} to {}.", tp, 
offset);
+                lastWrittenOffset = offset;
+                snapshotRegistry.getOrCreateSnapshot(offset);
+            }
+
+            /**
+             * Updates the last committed offset. This completes all the 
deferred
+             * events waiting on this offset. This also cleanups all the 
snapshots
+             * prior to this offset.
+             *
+             * @param offset The new last committed offset.
+             */
+            @Override
+            public void updateLastCommittedOffset(Long offset) {
+                synchronized (this) {
+                    if (offset < lastCommittedOffset) {
+                        throw new IllegalStateException("New committed offset 
" + offset + " of " + tp +
+                            " must be greater than or equal to " + 
lastCommittedOffset + ".");
+                    }
+
+                    lastCommittedOffset = offset;
+                    deferredEventQueue.completeUpTo(offset);
+                    snapshotRegistry.deleteSnapshotsUpTo(offset);

Review Comment:
   Accessing `deferredEventQueue` here is weird, no? It is owned by the 
CoordinatorContext and they it should be accessed while holding the lock of the 
context. We don't respect this here. I suppose that the same would apply to 
`coordinatorMetrics`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java:
##########
@@ -47,4 +47,15 @@ default void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {}
      * any post unloading operations.
      */
     default void onUnloaded() {}
+
+    /**
+     * Replay a record to update the state machine.
+     *
+     * @param record The record to replay.
+     */
+    default void replay(
+        long producerId,
+        short producerEpoch,
+        U record
+    ) throws RuntimeException {}

Review Comment:
   I still don't get why we need a default implementation here. I think that it 
should be:
   ```
   void replay(
           long producerId,
           short producerEpoch,
           U record
   ) throws RuntimeException;
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -370,6 +371,155 @@ public int size() {
      * CoordinatorContext holds all the metadata around a coordinator state 
machine.
      */
     class CoordinatorContext {
+
+        /**
+         * ContextStateMachine is a wrapper on top of the coordinator state 
machine. The methods
+         * may be invoked by multiple threads, so they are synchronized.
+         */
+        class ContextStateMachine implements CoordinatorPlayback<U> {
+
+            /**
+             * The actual state machine.
+             */
+            private S coordinator;
+
+            /**
+             * The snapshot registry backing the coordinator.
+             */
+            private final SnapshotRegistry snapshotRegistry;
+
+            /**
+             * The last offset written to the partition.
+             */
+            private long lastWrittenOffset;
+
+            /**
+             * The last offset committed. This represents the high
+             * watermark of the partition.
+             */
+            private long lastCommittedOffset;
+
+            ContextStateMachine(
+                SnapshotRegistry snapshotRegistry,
+                S coordinator, 
+                long lastWrittenOffset, 
+                long lastCommittedOffset

Review Comment:
   It seems that we could just remove them and init them to zero.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -370,6 +371,155 @@ public int size() {
      * CoordinatorContext holds all the metadata around a coordinator state 
machine.
      */
     class CoordinatorContext {
+
+        /**
+         * ContextStateMachine is a wrapper on top of the coordinator state 
machine. The methods
+         * may be invoked by multiple threads, so they are synchronized.
+         */
+        class ContextStateMachine implements CoordinatorPlayback<U> {
+
+            /**
+             * The actual state machine.
+             */
+            private S coordinator;
+
+            /**
+             * The snapshot registry backing the coordinator.
+             */
+            private final SnapshotRegistry snapshotRegistry;
+
+            /**
+             * The last offset written to the partition.
+             */
+            private long lastWrittenOffset;
+
+            /**
+             * The last offset committed. This represents the high
+             * watermark of the partition.
+             */
+            private long lastCommittedOffset;
+
+            ContextStateMachine(
+                SnapshotRegistry snapshotRegistry,
+                S coordinator, 
+                long lastWrittenOffset, 
+                long lastCommittedOffset
+            ) {
+                this.coordinator = coordinator;
+                this.snapshotRegistry = snapshotRegistry;
+                this.lastWrittenOffset = lastWrittenOffset;
+                this.lastCommittedOffset = lastCommittedOffset;
+                snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+                snapshotRegistry.deleteSnapshotsUpTo(lastWrittenOffset);
+            }
+
+            /**
+             * Reverts the last written offset. This also reverts the snapshot
+             * registry to this offset. All the changes applied after the 
offset
+             * are lost.
+             *
+             * @param offset The offset to revert to.
+             */
+            private synchronized void revertLastWrittenOffset(
+                long offset
+            ) {
+                if (offset > lastWrittenOffset) {
+                    throw new IllegalStateException("New offset " + offset + " 
of " + tp +
+                        " must be smaller than " + lastWrittenOffset + ".");
+                }
+
+                log.debug("Revert last written offset of {} to {}.", tp, 
offset);
+                lastWrittenOffset = offset;
+                snapshotRegistry.revertToSnapshot(offset);
+            }
+
+            @Override
+            public void replay(

Review Comment:
   Why don't we synchronize this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to