chia7712 commented on code in PR #20847:
URL: https://github.com/apache/kafka/pull/20847#discussion_r2557281553
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -109,6 +110,10 @@ public class GroupCoordinatorConfig {
public static final CompressionType
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC =
"Compression codec for the offsets topic - compression may be used to achieve
\"atomic\" commits.";
+ public static final String APPEND_MAX_BUFFER_SIZE_CONFIG =
"group.coordinator.append.max.buffer.size";
+ public static final int APPEND_MAX_BUFFER_SIZE_DEFAULT = 1024 * 1024 +
Records.LOG_OVERHEAD;
+ public static final String APPEND_MAX_BUFFER_SIZE_DOC = "The largest
buffer size allowed by GroupCoordinator (It is recommended not to exceed the
maximum allowed message size).";
Review Comment:
ditto
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java:
##########
@@ -82,6 +83,10 @@ public class ShareCoordinatorConfig {
public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 *
60 * 1000; // 5 minutes
public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The
duration in milliseconds that the share coordinator will wait between force
snapshotting share partitions which are not being updated.";
+ public static final String APPEND_MAX_BUFFER_SIZE_CONFIG =
"share.coordinator.append.max.buffer.size";
+ public static final int APPEND_MAX_BUFFER_SIZE_DEFAULT = 1024 * 1024 +
Records.LOG_OVERHEAD;
+ public static final String APPEND_MAX_BUFFER_SIZE_DOC = "The largest
buffer size allowed by ShareCoordinator (It is recommended not to exceed the
maximum allowed message size).";
Review Comment:
`share.coordinator.append.max.buffer.size` CAN NOT be larger than message
size, right? If so, we should highlight that limit.
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2127,6 +2146,10 @@ private CoordinatorRuntime(
this.compression = compression;
this.appendLingerMs = appendLingerMs;
this.executorService = executorService;
+ this.appendMaxBufferSizeSupplier = appendMaxBufferSizeSupplier;
+ this.runtimeMetrics.registerAppendBufferSizeGauge(
+ () -> coordinators.values().stream().mapToLong(c ->
c.bufferSupplier.size()).sum()
Review Comment:
This appears to create an implicit call chain between `CoordinatorRuntime`
and `CoordinatorRuntimeMetricsImpl`. Perhaps, `CoordinatorRuntimeMetricsImpl`
could maintain a `AtomicLong` variable, and we could update its value via
`freeCurrentBatch`. For example:
```java
if (currentBatch.builder.buffer().capacity() <= maxBufferSize) {
var before = bufferSupplier.size();
bufferSupplier.release(currentBatch.builder.buffer());
runtimeMetrics.recordAppendBufferSize(bufferSupplier.size()
- before);
} else if (currentBatch.buffer.capacity() <= maxBufferSize) {
var before = bufferSupplier.size();
bufferSupplier.release(currentBatch.buffer);
runtimeMetrics.recordAppendBufferSize(bufferSupplier.size()
- before);
} else {
runtimeMetrics.recordAppendBufferDiscarded();
}
```
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -772,13 +782,15 @@ private void freeCurrentBatch() {
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
- // Release the buffer only if it is not larger than the
maxBatchSize.
- int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+ // Release the buffer only if it is not larger than the max buffer
size.
+ int maxBufferSize = appendMaxBufferSizeSupplier.get();
Review Comment:
Should `maybeAllocateNewBatch` also adopt `appendMaxBufferSizeSupplier`
instead of message size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]