Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-20 Thread via GitHub


dajac merged PR #15964:
URL: https://github.com/apache/kafka/pull/15964


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-17 Thread via GitHub


dajac commented on PR #15964:
URL: https://github.com/apache/kafka/pull/15964#issuecomment-2116983828

   @jolshan Thanks for your comments. I replied to them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604418654


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##
@@ -84,98 +67,28 @@ class CoordinatorPartitionWriterTest {
   }
 
   @Test
-  def testWriteRecords(): Unit = {

Review Comment:
   Right. We have many tests in CoordinatorRuntimeTest doing writes. As we 
fully validate the records now, they cover this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604417628


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -18,40 +18,21 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * A simple interface to write records to Partitions/Logs. It contains the 
minimum
  * required for coordinators.
- *
- * @param  The record type.
  */
-public interface PartitionWriter {
-
-/**
- * Serializer to translate T to bytes.
- *
- * @param  The record type.
- */
-interface Serializer {
-/**
- * Serializes the key of the record.
- */
-byte[] serializeKey(T record);
-
-/**
- * Serializes the value of the record.
- */
-byte[] serializeValue(T record);
-}
+public interface PartitionWriter {
 
 /**
  * Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, List)}}.
+ * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.

Review Comment:
   Intellij reports them as warning. I suppose that we would get warning when 
we generate the javadoc too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604415305


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -723,30 +757,66 @@ public void run() {
 // If the records are not empty, first, they are 
applied to the state machine,
 // second, then are written to the partition/log, and 
finally, the response
 // is put into the deferred event queue.
+long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long currentTimeMs = time.milliseconds();
+ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(16384, maxBatchSize));
+
 try {
-// Apply the records to the state machine.
-if (result.replayRecords()) {
-// We compute the offset of the record based 
on the last written offset. The
-// coordinator is the single writer to the 
underlying partition so we can
-// deduce it like this.
-for (int i = 0; i < result.records().size(); 
i++) {
+MemoryRecordsBuilder builder = 
MemoryRecords.builder(

Review Comment:
   The `builder` is used in the above loop (L801) so we need it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1604414478


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1046,12 +1118,18 @@ public void run() {
 result
 );
 
-long offset = 
partitionWriter.appendEndTransactionMarker(
+long offset = partitionWriter.append(
 tp,
-producerId,
-producerEpoch,
-coordinatorEpoch,
-result
+VerificationGuard.SENTINEL,
+MemoryRecords.withEndTransactionMarker(
+time.milliseconds(),

Review Comment:
   `withEndTransactionMarker` takes the current time if we don't specify it. 
The reason why I set it explicitly here is to ensure that the mock time is used 
in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603862201


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##
@@ -84,98 +67,28 @@ class CoordinatorPartitionWriterTest {
   }
 
   @Test
-  def testWriteRecords(): Unit = {

Review Comment:
   Do we have an equivalent test for the writing of the records in 
CoordinatorRuntimeTest? I didn't really notice new tests, but saw we have some 
of the builder logic there. Is it tested by checking equality between the 
records generated by the helper methods and the output from running the 
CoordinatorRuntime code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603853446


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -18,162 +18,29 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 /**
  * An in-memory partition writer.
- *
- * @param  The record type.
  */
-public class InMemoryPartitionWriter implements PartitionWriter {
-
-public static class LogEntry {

Review Comment:
   nice that we could just use the real memory records



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603806833


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java:
##
@@ -18,40 +18,21 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.VerificationGuard;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * A simple interface to write records to Partitions/Logs. It contains the 
minimum
  * required for coordinators.
- *
- * @param  The record type.
  */
-public interface PartitionWriter {
-
-/**
- * Serializer to translate T to bytes.
- *
- * @param  The record type.
- */
-interface Serializer {
-/**
- * Serializes the key of the record.
- */
-byte[] serializeKey(T record);
-
-/**
- * Serializes the value of the record.
- */
-byte[] serializeValue(T record);
-}
+public interface PartitionWriter {
 
 /**
  * Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, List)}}.
+ * to be used in conjunction with {{@link 
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.

Review Comment:
   Is there a programatic way to check if these links are broken due to 
refactoring, or do you need to do it manually?
   
   Just wondering if there is an easy way to check you did them all :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603801460


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -723,30 +757,66 @@ public void run() {
 // If the records are not empty, first, they are 
applied to the state machine,
 // second, then are written to the partition/log, and 
finally, the response
 // is put into the deferred event queue.
+long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long currentTimeMs = time.milliseconds();
+ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(16384, maxBatchSize));
+
 try {
-// Apply the records to the state machine.
-if (result.replayRecords()) {
-// We compute the offset of the record based 
on the last written offset. The
-// coordinator is the single writer to the 
underlying partition so we can
-// deduce it like this.
-for (int i = 0; i < result.records().size(); 
i++) {
+MemoryRecordsBuilder builder = 
MemoryRecords.builder(

Review Comment:
   nit: is there a benefit from putting this here and not right before the 
append method? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603797394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -723,30 +757,66 @@ public void run() {
 // If the records are not empty, first, they are 
applied to the state machine,
 // second, then are written to the partition/log, and 
finally, the response
 // is put into the deferred event queue.
+long prevLastWrittenOffset = 
context.coordinator.lastWrittenOffset();
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long currentTimeMs = time.milliseconds();
+ByteBuffer buffer = 
context.bufferSupplier.get(Math.min(16384, maxBatchSize));

Review Comment:
   Nice we got rid of the thread local. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1603782316


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1046,12 +1118,18 @@ public void run() {
 result
 );
 
-long offset = 
partitionWriter.appendEndTransactionMarker(
+long offset = partitionWriter.append(
 tp,
-producerId,
-producerEpoch,
-coordinatorEpoch,
-result
+VerificationGuard.SENTINEL,
+MemoryRecords.withEndTransactionMarker(
+time.milliseconds(),

Review Comment:
   It seems we didn't specify this time value before. Was that a bug? I guess 
it also just gets the system time in the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-15 Thread via GitHub


dajac commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1602677561


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1072,8 +1174,8 @@ public void replay(
 @Test
 public void testScheduleWriteOpWhenWriteFails() {
 MockTimer timer = new MockTimer();
-// The partition writer only accept on write.
-MockPartitionWriter writer = new MockPartitionWriter(2);
+// The partition writer only accept one write.

Review Comment:
   You got it right. A write operation produces a single batch with all the 
records generated by it. This patch does not change it but change where the 
memory record is built. The next patch will add the logic to keep the batch 
open until full or until a linger time is reached. With this, records produced 
by many write operations will end up in the same batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-15 Thread via GitHub


jolshan commented on PR #15964:
URL: https://github.com/apache/kafka/pull/15964#issuecomment-2113696511

   I took a first pass to get a general understanding. I will come back 
tomorrow and take a deeper dive in some of the minor changes and let you know 
if i think of anything missed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-15 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1602400486


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1072,8 +1174,8 @@ public void replay(
 @Test
 public void testScheduleWriteOpWhenWriteFails() {
 MockTimer timer = new MockTimer();
-// The partition writer only accept on write.
-MockPartitionWriter writer = new MockPartitionWriter(2);
+// The partition writer only accept one write.

Review Comment:
   for my understanding, we always batched the (in this case 2) records that 
were part of the same write operation. For now we aren't changing this, but 
moving the logic to the coordinator runtime to make space for the batching 
logic as a followup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [1/N] Coalesce records into bigger batches [kafka]

2024-05-15 Thread via GitHub


jolshan commented on code in PR #15964:
URL: https://github.com/apache/kafka/pull/15964#discussion_r1602400486


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1072,8 +1174,8 @@ public void replay(
 @Test
 public void testScheduleWriteOpWhenWriteFails() {
 MockTimer timer = new MockTimer();
-// The partition writer only accept on write.
-MockPartitionWriter writer = new MockPartitionWriter(2);
+// The partition writer only accept one write.

Review Comment:
   for my understanding, we always batched the (in this case 2) records that 
were part of the same write operation. For now we aren't changing this, but 
moving the logic to the coordinator runtime to make space for the batching 
logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org