squah-confluent commented on code in PR #20847:
URL: https://github.com/apache/kafka/pull/20847#discussion_r2505132773
##########
clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java:
##########
@@ -115,6 +130,11 @@ public void release(ByteBuffer buffer) {
cachedBuffer = buffer;
}
+ @Override
+ public long size() {
+ return cachedBuffer.capacity();
Review Comment:
`cachedBuffer` can be `null`
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -2001,6 +2013,11 @@ public void onHighWatermarkUpdated(
*/
private final ExecutorService executorService;
+ /**
+ * The maximum buffer size that the coordinator can cache.
+ */
+ private final Supplier<Integer> appendMaxBufferSizeSupplied;
Review Comment:
typo?
```suggestion
private final Supplier<Integer> appendMaxBufferSizeSupplier;
```
##########
clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java:
##########
@@ -58,6 +63,11 @@ public static BufferSupplier create() {
*/
public abstract void release(ByteBuffer buffer);
+ /**
+ * Return total size in bytes of cached buffers.
+ */
+ public abstract long size();
Review Comment:
The size implementations aren't thread safe, but they will be called from a
metrics thread.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -639,7 +648,7 @@ GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
consumerGroupSessionTimeoutMs(),
}
/**
- * The number of threads or event loops running.
+ * The number of threads for event loops running.
Review Comment:
I don't think this was a typo.
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2767,6 +2874,7 @@ public void
testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated(
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withMaxBufferSizeSupplier(maxBufferSizeSupplierMock)
Review Comment:
Is the `Supplier` mock necessary? Can we pass `() -> MAX_BUFFER_SIZE` to
`withMaxBufferSizeSupplier` directly?
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1038,6 +1038,7 @@ class KafkaConfigTest {
/** New group coordinator configs */
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.APPEND_MAX_BUFFER_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Review Comment:
```suggestion
case GroupCoordinatorConfig.APPEND_MAX_BUFFER_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, 512 * 1024 -
1)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]