dajac commented on code in PR #16215: URL: https://github.com/apache/kafka/pull/16215#discussion_r1628920984
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ########## @@ -626,89 +833,113 @@ private void append( // If the records are empty, it was a read operation after all. In this case, // the response can be returned directly iff there are no pending write operations; // otherwise, the read needs to wait on the last write operation to be completed. - OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - deferredEventQueue.add(pendingOffset.getAsLong(), event); + if (currentBatch != null) { + currentBatch.events.add(event); } else { - event.complete(null); + OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); + if (pendingOffset.isPresent()) { + deferredEventQueue.add(pendingOffset.getAsLong(), event); + } else { + event.complete(null); + } } } else { // If the records are not empty, first, they are applied to the state machine, - // second, then are written to the partition/log, and finally, the response - // is put into the deferred event queue. - long prevLastWrittenOffset = coordinator.lastWrittenOffset(); - LogConfig logConfig = partitionWriter.config(tp); - byte magic = logConfig.recordVersion().value; - int maxBatchSize = logConfig.maxMessageSize(); + // second, then are appended to the opened batch. long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); - try { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder( - buffer, - magic, - compression, - TimestampType.CREATE_TIME, - 0L, + // If the current write operation is transactional, the current batch + // is written before proceeding with it. Review Comment: Transactional records must be in a single batch because the transactional metadata such as the producer id and epoch are stored at the batch level. In other words, it is not possible to batch transactional records. The goal of this code is not the flush the batch created by the write operation if transactional but to flush the previous batch if any. Imagine that you have a regular write, a batch is created with the records but it is not flushed not. Then, you have a transactional write. In this case, the first batch must be flushed and the the transactional one must be flushed too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org