jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628278619


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,330 @@ private void unload() {
             }
             timer.cancelAll();
             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+            failCurrentBatch(Errors.NOT_COORDINATOR.exception());
             if (coordinator != null) {
                 coordinator.onUnloaded();
             }
             coordinator = null;
         }
+
+        /**
+         * Frees the current batch.
+         */
+        private void freeCurrentBatch() {
+            // Cancel the linger timeout.
+            currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+            // Release the buffer.
+            bufferSupplier.release(currentBatch.buffer);
+
+            currentBatch = null;
+        }
+
+        /**
+         * Writes the current (or pending) batch to the log. When the batch is 
written
+         * locally, a new snapshot is created in the snapshot registry and the 
events
+         * associated with the batch are added to the deferred event queue.
+         */
+        private void writeCurrentBatch() {
+            if (currentBatch != null) {
+                try {
+                    // Write the records to the log and update the last 
written offset.
+                    long offset = partitionWriter.append(
+                        tp,
+                        currentBatch.verificationGuard,
+                        currentBatch.builder.build()
+                    );
+                    coordinator.updateLastWrittenOffset(offset);
+
+                    // Add all the pending deferred events to the deferred 
event queue.
+                    for (DeferredEvent event : currentBatch.events) {
+                        deferredEventQueue.add(offset, event);
+                    }
+
+                    // Free up the current batch.
+                    freeCurrentBatch();
+                } catch (Throwable t) {
+                    failCurrentBatch(t);
+                }
+            }
+        }
+
+        /**
+         * Writes the current batch if it is transactional or if it has past 
the append linger time.
+         */
+        private void maybeWriteCurrentBatch(long currentTimeMs) {
+            if (currentBatch != null) {
+                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+                    writeCurrentBatch();
+                }
+            }
+        }
+
+        /**
+         * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+         * batch, fails all the associated events.
+         */
+        private void failCurrentBatch(Throwable t) {
+            if (currentBatch != null) {
+                coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+                for (DeferredEvent event : currentBatch.events) {
+                    event.complete(t);
+                }
+                freeCurrentBatch();
+            }
+        }
+
+        /**
+         * Allocates a new batch if none already exists.
+         */
+        private void maybeAllocateNewBatch(
+            long producerId,
+            short producerEpoch,
+            VerificationGuard verificationGuard,
+            long currentTimeMs
+        ) {
+            if (currentBatch == null) {
+                LogConfig logConfig = partitionWriter.config(tp);
+                byte magic = logConfig.recordVersion().value;
+                int maxBatchSize = logConfig.maxMessageSize();
+                long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+                ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+                MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+                    buffer,
+                    magic,
+                    compression,
+                    TimestampType.CREATE_TIME,
+                    0L,
+                    currentTimeMs,
+                    producerId,
+                    producerEpoch,
+                    0,
+                    producerId != RecordBatch.NO_PRODUCER_ID,
+                    false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                    maxBatchSize
+                );
+
+                Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+                if (appendLingerMs > 0) {
+                    lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+                        @Override
+                        public void run() {
+                            scheduleInternalOperation("FlushBatch", tp, () -> {

Review Comment:
   Do we cancel this task in the right way so there isn't a race when we call 
maybeWriteCurrentBatch?
   Seems like we only cancel this after the write is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to