mumrah commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r572957037



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order 
in the batches.
+     * This method will use as many batches as necessary to serialize all of 
the records. Since
+     * this method can split the records into multiple batches it is possible 
that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then 
{@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the epoch does not
+     *         match; null if no memory could be allocated for the batch at 
this time
+     * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
+     *         batch size; if this exception is throw some of the elements in 
records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {

Review comment:
       Since these two `append` methods are mostly identical, can we use a 
single private method with `isAtomic` as an argument? Otherwise seems we will 
need to update both methods in most cases as we add features and fix bugs.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order 
in the batches.
+     * This method will use as many batches as necessary to serialize all of 
the records. Since
+     * this method can split the records into multiple batches it is possible 
that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then 
{@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} 
if the epoch does not
+     *         match; null if no memory could be allocated for the batch at 
this time
+     * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
+     *         batch size; if this exception is throw some of the elements in 
records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
+            return Long.MAX_VALUE;
+        }
+
+        ObjectSerializationCache serializationCache = new 
ObjectSerializationCache();
+
+        appendLock.lock();
+        try {
+            maybeCompleteDrain();
+
+            for (T record : records) {
+                BatchBuilder<T> batch = maybeAllocateBatch(

Review comment:
       Do we need to create a batch for each record? What is the overhead 
associated with a batch? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to