dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633371246


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,339 @@ private void unload() {
             }
             timer.cancelAll();
             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+            failCurrentBatch(Errors.NOT_COORDINATOR.exception());
             if (coordinator != null) {
                 coordinator.onUnloaded();
             }
             coordinator = null;
         }
+
+        /**
+         * Frees the current batch.
+         */
+        private void freeCurrentBatch() {
+            // Cancel the linger timeout.
+            currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+            // Release the buffer.
+            bufferSupplier.release(currentBatch.buffer);
+
+            currentBatch = null;
+        }
+
+        /**
+         * Flushes the current (or pending) batch to the log. When the batch 
is written
+         * locally, a new snapshot is created in the snapshot registry and the 
events
+         * associated with the batch are added to the deferred event queue.
+         */
+        private void flushCurrentBatch() {
+            if (currentBatch != null) {
+                try {
+                    // Write the records to the log and update the last 
written offset.
+                    long offset = partitionWriter.append(
+                        tp,
+                        currentBatch.verificationGuard,
+                        currentBatch.builder.build()
+                    );
+                    coordinator.updateLastWrittenOffset(offset);
+
+                    if (offset != currentBatch.nextOffset) {
+                        throw new IllegalStateException("The state machine of 
coordinator " + tp + " is out of sync with the " +
+                            "underlying log. The last write returned " + 
offset + " while " + currentBatch.nextOffset + " was expected");

Review Comment:
   I did it in the last commit.
   
   For the record, I also played with an alternative approach. We could 
basically leverage the offset validation used by the replication layer and the 
raft layer to ensure that the start offset of the batch corresponds to the 
current end offset of the log. If it is not, it would be rejected. This is 
slightly better because it ensures that the batch is not even written of the 
state machine is out of sync. However, I paused it for now because it also 
requires to add the leader epoch to the batch to validate it too. I will look 
into this in details later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to