dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1628920984


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -626,89 +833,113 @@ private void append(
                 // If the records are empty, it was a read operation after 
all. In this case,
                 // the response can be returned directly iff there are no 
pending write operations;
                 // otherwise, the read needs to wait on the last write 
operation to be completed.
-                OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
-                if (pendingOffset.isPresent()) {
-                    deferredEventQueue.add(pendingOffset.getAsLong(), event);
+                if (currentBatch != null) {
+                    currentBatch.events.add(event);
                 } else {
-                    event.complete(null);
+                    OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
+                    if (pendingOffset.isPresent()) {
+                        deferredEventQueue.add(pendingOffset.getAsLong(), 
event);
+                    } else {
+                        event.complete(null);
+                    }
                 }
             } else {
                 // If the records are not empty, first, they are applied to 
the state machine,
-                // second, then are written to the partition/log, and finally, 
the response
-                // is put into the deferred event queue.
-                long prevLastWrittenOffset = coordinator.lastWrittenOffset();
-                LogConfig logConfig = partitionWriter.config(tp);
-                byte magic = logConfig.recordVersion().value;
-                int maxBatchSize = logConfig.maxMessageSize();
+                // second, then are appended to the opened batch.
                 long currentTimeMs = time.milliseconds();
-                ByteBuffer buffer = 
bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize));
 
-                try {
-                    MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-                        buffer,
-                        magic,
-                        compression,
-                        TimestampType.CREATE_TIME,
-                        0L,
+                // If the current write operation is transactional, the 
current batch
+                // is written before proceeding with it.

Review Comment:
   Transactional records must be in a single batch because the transactional 
metadata such as the producer id and epoch are stored at the batch level. In 
other words, it is not possible to batch transactional records.
   
   The goal of this code is not the flush the batch created by the write 
operation if transactional but to flush the previous batch if any. Imagine that 
you have a regular write, a batch is created with the records but it is not 
flushed not. Then, you have a transactional write. In this case, the first 
batch must be flushed and the the transactional one must be flushed too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to