This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb9280eceea KAFKA-19519 Introduce
group/share.coordinator.cached.buffer.max.bytes config (#20847)
cb9280eceea is described below
commit cb9280eceea37be40fbf46b7b9fab17dda0e1faf
Author: Lan Ding <[email protected]>
AuthorDate: Thu Dec 11 22:13:02 2025 +0800
KAFKA-19519 Introduce group/share.coordinator.cached.buffer.max.bytes
config (#20847)
**Changes**
1. New Dynamic Configurations
- `group.coordinator.cached.buffer.max.bytes`: Largest cached buffer
size
allowed by GroupCoordinator
- `share.coordinator.cached.buffer.max.bytes`: Largest cached buffer
size
allowed by ShareCoordinator
Both configurations default to `1 * 1024 * 1024 + Records.LOG_OVERHEAD`
with minimum value of `512 * 1024`.
2. Extended CoordinatorRuntime Builder Interface
Added withCachedBufferMaxBytesSupplier(Supplier<Integer>
cachedBufferMaxBytesSupplier) method to allow different coordinator
implementations to supply their buffer size configuration.
3. New Monitoring Metrics
- `batch-buffer-cache-size-bytes`: Current total size in bytes of the
append
buffers being held in the coordinator's cache
- `batch-buffer-cache-discard-count`: Count of oversized append buffers
that
were discarded instead of being cached upon release
Reviewers: Sushant Mahajan <[email protected]>, David Jacot
<[email protected]>, Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 58 +++++--
.../common/runtime/CoordinatorRuntimeMetrics.java | 12 ++
.../runtime/CoordinatorRuntimeMetricsImpl.java | 46 +++++-
.../runtime/CoordinatorRuntimeMetricsImplTest.java | 30 +++-
.../common/runtime/CoordinatorRuntimeTest.java | 174 +++++++++++++--------
.../scala/kafka/server/DynamicBrokerConfig.scala | 6 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 19 +++
.../kafka/server/DynamicConfigChangeTest.scala | 51 +++++-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
docs/ops.html | 20 +++
docs/upgrade.html | 13 ++
.../coordinator/group/GroupCoordinatorConfig.java | 27 ++++
.../coordinator/group/GroupCoordinatorService.java | 1 +
.../group/GroupCoordinatorConfigTest.java | 3 +
.../coordinator/share/ShareCoordinatorConfig.java | 29 +++-
.../coordinator/share/ShareCoordinatorService.java | 1 +
.../share/ShareCoordinatorTestConfig.java | 1 +
17 files changed, 406 insertions(+), 86 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 58252561c39..92afee2cc7e 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -42,7 +42,6 @@ import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.server.common.TransactionVersion;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
-import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -66,6 +65,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.lang.Math.min;
@@ -120,6 +120,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
private Compression compression;
private OptionalInt appendLingerMs;
private ExecutorService executorService;
+ private Supplier<Integer> cachedBufferMaxBytesSupplier;
public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
@@ -196,6 +197,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
return this;
}
+ public Builder<S, U>
withCachedBufferMaxBytesSupplier(Supplier<Integer>
cachedBufferMaxBytesSupplier) {
+ this.cachedBufferMaxBytesSupplier = cachedBufferMaxBytesSupplier;
+ return this;
+ }
+
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
@@ -228,6 +234,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
throw new IllegalArgumentException("AppendLinger must be empty
or >= 0");
if (executorService == null)
throw new IllegalArgumentException("ExecutorService must be
set.");
+ if (cachedBufferMaxBytesSupplier == null)
+ throw new IllegalArgumentException("Cached buffer max bytes
supplier must be set.");
return new CoordinatorRuntime<>(
logPrefix,
@@ -244,7 +252,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
serializer,
compression,
appendLingerMs,
- executorService
+ executorService,
+ cachedBufferMaxBytesSupplier
);
}
}
@@ -479,11 +488,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
final long appendTimeMs;
- /**
- * The max batch size.
- */
- final int maxBatchSize;
-
/**
* The verification guard associated to the batch if it is
* transactional.
@@ -521,7 +525,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
Logger log,
long baseOffset,
long appendTimeMs,
- int maxBatchSize,
VerificationGuard verificationGuard,
ByteBuffer buffer,
MemoryRecordsBuilder builder,
@@ -530,7 +533,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
this.baseOffset = baseOffset;
this.nextOffset = baseOffset;
this.appendTimeMs = appendTimeMs;
- this.maxBatchSize = maxBatchSize;
this.verificationGuard = verificationGuard;
this.buffer = buffer;
this.builder = builder;
@@ -603,6 +605,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
BufferSupplier bufferSupplier;
+ /**
+ * The cached buffer size.
+ */
+ AtomicLong cachedBufferSize;
+
/**
* The current (or pending) batch.
*/
@@ -641,6 +648,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
defaultWriteTimeout
);
this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
+ this.cachedBufferSize = new AtomicLong(0);
}
/**
@@ -772,13 +780,20 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
- // Release the buffer only if it is not larger than the
maxBatchSize.
- int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
+ // Release the buffer only if it is not larger than the
cachedBufferMaxBytes.
+ int cachedBufferMaxBytes = cachedBufferMaxBytesSupplier.get();
- if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
+ if (currentBatch.builder.buffer().capacity() <=
cachedBufferMaxBytes) {
bufferSupplier.release(currentBatch.builder.buffer());
- } else if (currentBatch.buffer.capacity() <= maxBatchSize) {
+ cachedBufferSize.set(currentBatch.builder.buffer().capacity());
+ } else if (currentBatch.buffer.capacity() <= cachedBufferMaxBytes)
{
bufferSupplier.release(currentBatch.buffer);
+ cachedBufferSize.set(currentBatch.buffer.capacity());
+ // If the builder expands the buffer beyond the
cachedBufferMaxBytes, that should also increase the discard counter.
+ runtimeMetrics.recordBufferCacheDiscarded();
+ } else {
+ runtimeMetrics.recordBufferCacheDiscarded();
+ cachedBufferSize.set(0L);
}
currentBatch = null;
@@ -901,8 +916,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
long currentTimeMs
) {
if (currentBatch == null) {
- LogConfig logConfig = partitionWriter.config(tp);
- int maxBatchSize = logConfig.maxMessageSize();
+ int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
ByteBuffer buffer =
bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
@@ -953,7 +967,6 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
log,
prevLastWrittenOffset,
currentTimeMs,
- maxBatchSize,
verificationGuard,
buffer,
builder,
@@ -2064,6 +2077,11 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
private final ExecutorService executorService;
+ /**
+ * The maximum buffer size that the coordinator can cache.
+ */
+ private final Supplier<Integer> cachedBufferMaxBytesSupplier;
+
/**
* Atomic boolean indicating whether the runtime is running.
*/
@@ -2092,6 +2110,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
* @param compression The compression codec.
* @param appendLingerMs The append linger time in ms.
* @param executorService The executor service.
+ * @param cachedBufferMaxBytesSupplier The cached buffer max bytes
supplier.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private CoordinatorRuntime(
@@ -2109,7 +2128,8 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
Serializer<U> serializer,
Compression compression,
OptionalInt appendLingerMs,
- ExecutorService executorService
+ ExecutorService executorService,
+ Supplier<Integer> cachedBufferMaxBytesSupplier
) {
this.logPrefix = logPrefix;
this.log = logContext.logger(CoordinatorRuntime.class);
@@ -2127,6 +2147,10 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
this.compression = compression;
this.appendLingerMs = appendLingerMs;
this.executorService = executorService;
+ this.cachedBufferMaxBytesSupplier = cachedBufferMaxBytesSupplier;
+ this.runtimeMetrics.registerBufferCacheSizeGauge(
+ () -> coordinators.values().stream().mapToLong(c ->
c.cachedBufferSize.get()).sum()
+ );
}
/**
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
index 5b9b9254230..c537a2ea413 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
@@ -86,4 +86,16 @@ public interface CoordinatorRuntimeMetrics extends
AutoCloseable {
* @param sizeSupplier The size supplier.
*/
void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier);
+
+ /**
+ * Register the cached buffer size gauge.
+ *
+ * @param bufferCacheSizeSupplier The buffer cache size supplier.
+ */
+ void registerBufferCacheSizeGauge(Supplier<Long> bufferCacheSizeSupplier);
+
+ /**
+ * Called when a buffer is discarded upon release instead of being cached.
+ */
+ void recordBufferCacheDiscarded();
}
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index 8382090639c..93bc9848201 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -69,6 +69,16 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
*/
public static final String BATCH_FLUSH_TIME_METRIC_NAME =
"batch-flush-time-ms";
+ /**
+ * The buffer cache size metric name.
+ */
+ public static final String BATCH_BUFFER_CACHE_SIZE_METRIC_NAME =
"batch-buffer-cache-size-bytes";
+
+ /**
+ * The buffer cache discard count metric name.
+ */
+ public static final String BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME =
"batch-buffer-cache-discard-count";
+
/**
* Metric to count the number of partitions in Loading state.
*/
@@ -92,6 +102,17 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
*/
private final MetricName eventQueueSize;
+ /**
+ * Metric to count the size of the cached buffers.
+ */
+ private final MetricName bufferCacheSize;
+
+ /**
+ * Metric to count the number of over-sized append buffers that were
discarded.
+ */
+ private final MetricName bufferCacheDiscardCount;
+ private final AtomicLong bufferCacheDiscardCounter = new AtomicLong(0);
+
/**
* The Kafka metrics registry.
*/
@@ -156,9 +177,20 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
this.eventQueueSize = kafkaMetricName("event-queue-size", "The event
accumulator queue size.");
+ this.bufferCacheSize = kafkaMetricName(
+ BATCH_BUFFER_CACHE_SIZE_METRIC_NAME,
+ "The current total size in bytes of the append buffers being held
in the coordinator's cache."
+ );
+
+ this.bufferCacheDiscardCount = kafkaMetricName(
+ BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME,
+ "The count of over-sized append buffers that were discarded
instead of being cached upon release."
+ );
+
metrics.addMetric(numPartitionsLoading, (Gauge<Long>) (config, now) ->
numPartitionsLoadingCounter.get());
metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) ->
numPartitionsActiveCounter.get());
metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) ->
numPartitionsFailedCounter.get());
+ metrics.addMetric(bufferCacheDiscardCount, (Gauge<Long>) (config, now)
-> bufferCacheDiscardCounter.get());
this.partitionLoadSensor = metrics.sensor(this.metricsGroup +
"-PartitionLoadTime");
this.partitionLoadSensor.add(
@@ -252,7 +284,9 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
numPartitionsLoading,
numPartitionsActive,
numPartitionsFailed,
- eventQueueSize
+ eventQueueSize,
+ bufferCacheSize,
+ bufferCacheDiscardCount
).forEach(metrics::removeMetric);
metrics.removeSensor(partitionLoadSensor.name());
@@ -340,4 +374,14 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
public void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier) {
metrics.addMetric(eventQueueSize, (Gauge<Long>) (config, now) ->
(long) sizeSupplier.get());
}
+
+ @Override
+ public void registerBufferCacheSizeGauge(Supplier<Long>
bufferCacheSizeSupplier) {
+ metrics.addMetric(bufferCacheSize, (Gauge<Long>) (config, now) ->
bufferCacheSizeSupplier.get());
+ }
+
+ @Override
+ public void recordBufferCacheDiscarded() {
+ bufferCacheDiscardCounter.incrementAndGet();
+ }
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index b243f12466b..82601be18ba 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -33,6 +33,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.stream.IntStream;
+import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME;
+import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_BUFFER_CACHE_SIZE_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
@@ -88,7 +90,9 @@ public class CoordinatorRuntimeMetricsImplTest {
kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
kafkaMetricName(metrics, "batch-flush-time-ms-p99"),
kafkaMetricName(metrics, "batch-flush-time-ms-p999"),
- kafkaMetricName(metrics, "batch-flush-rate")
+ kafkaMetricName(metrics, "batch-flush-rate"),
+ kafkaMetricName(metrics, BATCH_BUFFER_CACHE_SIZE_METRIC_NAME),
+ kafkaMetricName(metrics,
BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME)
);
}
@@ -100,6 +104,7 @@ public class CoordinatorRuntimeMetricsImplTest {
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+ runtimeMetrics.registerBufferCacheSizeGauge(() -> 0L);
expectedMetrics.forEach(metricName ->
assertTrue(metrics.metrics().containsKey(metricName)));
}
@@ -118,6 +123,7 @@ public class CoordinatorRuntimeMetricsImplTest {
Set<MetricName> metricNames;
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+ runtimeMetrics.registerBufferCacheSizeGauge(() -> 0L);
ArgumentCaptor<String> sensorCaptor =
ArgumentCaptor.forClass(String.class);
verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
@@ -138,6 +144,7 @@ public class CoordinatorRuntimeMetricsImplTest {
Set<MetricName> otherMetricNames;
try (CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
+ otherRuntimeMetrics.registerBufferCacheSizeGauge(() -> 0L);
ArgumentCaptor<String> sensorCaptor =
ArgumentCaptor.forClass(String.class);
verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
@@ -185,7 +192,6 @@ public class CoordinatorRuntimeMetricsImplTest {
}
}
-
@Test
public void testPartitionLoadSensorMetrics() {
Time time = new MockTime();
@@ -233,6 +239,26 @@ public class CoordinatorRuntimeMetricsImplTest {
}
}
+ @Test
+ public void testBatchBufferCacheSize() {
+ Metrics metrics = new Metrics();
+
+ try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
+ runtimeMetrics.registerBufferCacheSizeGauge(() -> 5L);
+ assertMetricGauge(metrics, kafkaMetricName(metrics,
BATCH_BUFFER_CACHE_SIZE_METRIC_NAME), 5);
+ }
+ }
+
+ @Test
+ public void testBatchBufferCacheDiscardCount() {
+ Metrics metrics = new Metrics();
+
+ try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
+ runtimeMetrics.recordBufferCacheDiscarded();
+ assertMetricGauge(metrics, kafkaMetricName(metrics,
BATCH_BUFFER_CACHE_DISCARD_COUNT_METRIC_NAME), 1);
+ }
+ }
+
@ParameterizedTest
@ValueSource(strings = {
EVENT_QUEUE_TIME_METRIC_NAME,
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 0e78a8e1c3f..00609c94b4f 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
@@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -63,6 +63,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -102,6 +103,7 @@ public class CoordinatorRuntimeTest {
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
private static final short TXN_OFFSET_COMMIT_LATEST_VERSION =
ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
+ private static final int CACHED_BUFFER_MAX_BYTES = 1024 * 1024 +
Records.LOG_OVERHEAD;
@Test
public void testScheduleLoading() {
@@ -125,6 +127,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -197,6 +200,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -249,6 +253,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -305,6 +310,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -378,6 +384,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -434,6 +441,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -490,6 +498,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -534,6 +543,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -584,6 +594,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
doThrow(new KafkaException("error")).when(coordinator).onUnloaded();
@@ -640,6 +651,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -728,6 +740,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -850,6 +863,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Scheduling a write fails with a NotCoordinatorException because the
coordinator
@@ -875,6 +889,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -904,6 +919,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -965,6 +981,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1018,6 +1035,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1056,6 +1074,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
TopicPartition coordinator0 = new TopicPartition("__consumer_offsets",
0);
@@ -1128,6 +1147,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -1224,6 +1244,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -1287,6 +1308,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -1415,6 +1437,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1477,6 +1500,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1545,6 +1569,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1646,6 +1671,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1732,6 +1758,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1791,6 +1818,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule a read. It fails because the coordinator does not exist.
@@ -1817,6 +1845,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1864,6 +1893,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
TopicPartition coordinator0 = new TopicPartition("__consumer_offsets",
0);
@@ -1918,6 +1948,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(executorService)
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -1991,6 +2022,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
@@ -2056,6 +2088,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2099,6 +2132,7 @@ public class CoordinatorRuntimeTest {
public void testRescheduleTimer() throws InterruptedException {
MockTimer timer = new MockTimer();
ManualEventProcessor processor = new ManualEventProcessor();
+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
@@ -2112,6 +2146,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2188,6 +2223,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2261,6 +2297,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2322,6 +2359,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2397,6 +2435,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2441,6 +2480,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator.
@@ -2493,6 +2533,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
runtime.scheduleLoadOperation(TP, 10);
@@ -2551,6 +2592,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2631,6 +2673,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2689,6 +2732,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2748,6 +2792,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -2794,6 +2839,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(0))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -2869,6 +2915,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Load the coordinator.
@@ -2948,6 +2995,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(0))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -3022,6 +3070,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -3093,6 +3142,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3127,17 +3177,11 @@ public class CoordinatorRuntimeTest {
}
@Test
- public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+ public void
testCoordinatorDoNotRetainBufferLargeThanCachedBufferMaxBytes() {
MockTimer timer = new MockTimer();
- InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
- @Override
- public LogConfig config(TopicPartition tp) {
- return new LogConfig(Map.of(
- TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024) // 1MB
- ));
- }
- };
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false);
StringSerializer serializer = new StringSerializer();
+ CoordinatorRuntimeMetrics runtimeMetrics =
mock(CoordinatorRuntimeMetrics.class);
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
@@ -3148,10 +3192,11 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(mockWriter)
.withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
-
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3163,7 +3208,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- // Generate a record larger than the maxBatchSize.
+ // Generate a record larger than the cachedBufferMaxBytes.
List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
// Write #1.
@@ -3177,20 +3222,15 @@ public class CoordinatorRuntimeTest {
// Verify that the next buffer retrieved from the bufferSupplier is
the initial small one, not the large buffer.
assertEquals(INITIAL_BUFFER_SIZE,
ctx.bufferSupplier.get(1).capacity());
+ verify(runtimeMetrics, times(1)).recordBufferCacheDiscarded();
}
@Test
- public void
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+ public void
testCoordinatorRetainExpandedBufferLessOrEqualToCachedBufferMaxBytes() {
MockTimer timer = new MockTimer();
- InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
- @Override
- public LogConfig config(TopicPartition tp) {
- return new LogConfig(Map.of(
- TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024 * 1024) // 1GB
- ));
- }
- };
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false);
StringSerializer serializer = new StringSerializer();
+ int cachedBufferMaxBytes = 1024 * 1024 * 1024; // 1GB
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
@@ -3205,6 +3245,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() -> cachedBufferMaxBytes)
.build();
// Schedule the loading.
@@ -3216,11 +3257,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- // Generate enough records to create a batch that has
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
- List<String> records = new ArrayList<>();
- for (int i = 0; i < 1000000; i++) {
- records.add("record-" + i);
- }
+ // Generate enough records to create a batch that has
INITIAL_BUFFER_SIZE < batchSize < cachedBufferMaxBytes
+ List<String> records = List.of("A".repeat(INITIAL_BUFFER_SIZE + 1024));
// Write #1.
CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3232,30 +3270,19 @@ public class CoordinatorRuntimeTest {
assertFalse(write1.isCompletedExceptionally());
int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
- int maxBatchSize = mockWriter.config(TP).maxMessageSize();
- assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <=
maxBatchSize);
+ assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <=
cachedBufferMaxBytes);
// Verify that the next buffer retrieved from the bufferSupplier is
the expanded buffer.
assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
}
@Test
- public void
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+ public void
testBufferShrinkWhenCachedBufferMaxBytesReducedBelowBatchSize() {
MockTimer timer = new MockTimer();
- var mockWriter = new InMemoryPartitionWriter(false) {
- private LogConfig config = new LogConfig(Map.of(
- TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 *
1024) // 1MB
- ));
-
- @Override
- public LogConfig config(TopicPartition tp) {
- return config;
- }
-
- public void updateConfig(LogConfig newConfig) {
- this.config = newConfig;
- }
- };
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false);
+ Supplier<Integer> maxBufferSizeSupplierMock = mock(Supplier.class);
+ CoordinatorRuntimeMetrics runtimeMetrics =
mock(CoordinatorRuntimeMetrics.class);
+
when(maxBufferSizeSupplierMock.get()).thenReturn(CACHED_BUFFER_MAX_BYTES);
StringSerializer serializer = new StringSerializer();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
@@ -3267,10 +3294,11 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(mockWriter)
.withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
-
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(maxBufferSizeSupplierMock)
.build();
// Schedule the loading.
@@ -3282,10 +3310,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- List<String> records = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
- records.add("record-" + i);
- }
+ List<String> records = List.of("A".repeat(INITIAL_BUFFER_SIZE + 1024));
// Write #1.
CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3297,18 +3322,15 @@ public class CoordinatorRuntimeTest {
assertFalse(write1.isCompletedExceptionally());
int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
- int maxBatchSize = mockWriter.config(TP).maxMessageSize();
- assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <=
maxBatchSize);
+ assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize <=
CACHED_BUFFER_MAX_BYTES);
ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
- assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
+ assertTrue(cachedBuffer.capacity() > INITIAL_BUFFER_SIZE &&
cachedBuffer.capacity() < CACHED_BUFFER_MAX_BYTES);
// ctx.bufferSupplier.get(1); will clear cachedBuffer in
bufferSupplier. Use release to put it back to bufferSupplier
ctx.bufferSupplier.release(cachedBuffer);
- // Reduce max message size below initial buffer size.
- mockWriter.updateConfig(new LogConfig(
- Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
String.valueOf(INITIAL_BUFFER_SIZE - 66))));
- assertEquals(INITIAL_BUFFER_SIZE - 66,
mockWriter.config(TP).maxMessageSize());
+ // Reduce max buffer size below batch size.
+ when(maxBufferSizeSupplierMock.get()).thenReturn(batchSize - 66);
// Write #2.
CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3316,8 +3338,9 @@ public class CoordinatorRuntimeTest {
);
assertFalse(write2.isCompletedExceptionally());
- // Verify that there is no cached buffer since the cached buffer size
is greater than new maxMessageSize.
+ // Verify that there is no cached buffer since the cached buffer size
is greater than new cached buffer max bytes.
assertEquals(1, ctx.bufferSupplier.get(1).capacity());
+ verify(runtimeMetrics, times(1)).recordBufferCacheDiscarded();
// Write #3.
CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
@@ -3325,8 +3348,9 @@ public class CoordinatorRuntimeTest {
);
assertFalse(write3.isCompletedExceptionally());
- // Verify that the cached buffer size is equals to new maxMessageSize
that less than INITIAL_BUFFER_SIZE.
- assertEquals(mockWriter.config(TP).maxMessageSize(),
ctx.bufferSupplier.get(1).capacity());
+ // Verify that the cached buffer size is equals to initial buffer size.
+ assertEquals(INITIAL_BUFFER_SIZE,
ctx.bufferSupplier.get(1).capacity());
+ verify(runtimeMetrics, times(2)).recordBufferCacheDiscarded();
}
@Test
@@ -3348,6 +3372,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3483,6 +3508,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3535,6 +3561,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3621,6 +3648,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3719,6 +3747,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -3854,6 +3883,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.empty())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -3962,6 +3992,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.empty())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -4151,6 +4182,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.empty())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -4312,6 +4344,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4427,6 +4460,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4476,6 +4510,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4585,6 +4620,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4681,6 +4717,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4768,6 +4805,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(serializer)
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4839,6 +4877,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -4952,6 +4991,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -5063,6 +5103,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -5075,7 +5116,7 @@ public class CoordinatorRuntimeTest {
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
- // Create 2 records with a quarter of the max batch size each.
+ // Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
@@ -5099,7 +5140,7 @@ public class CoordinatorRuntimeTest {
// Write #2 with the large record. This record is too large to go into
the previous batch
// uncompressed but fits in a new buffer, so we should flush the
previous batch and allocate
- // a new one.
+ // a new one.
CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(largeRecord, "response2")
);
@@ -5149,6 +5190,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -5161,7 +5203,7 @@ public class CoordinatorRuntimeTest {
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
- // Create 2 records with a quarter of the max batch size each.
+ // Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
@@ -5235,6 +5277,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
@@ -5247,7 +5290,7 @@ public class CoordinatorRuntimeTest {
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
- // Create 2 records with a quarter of the max batch size each.
+ // Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
@@ -5302,7 +5345,7 @@ public class CoordinatorRuntimeTest {
assertTrue(write2.isDone());
assertEquals(2L, ctx.coordinator.lastCommittedOffset());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
- }
+ }
@Test
public void testRecordEventPurgatoryTime() throws Exception {
@@ -5326,6 +5369,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(0))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -5413,6 +5457,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(0))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -5480,6 +5525,7 @@ public class CoordinatorRuntimeTest {
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -5561,6 +5607,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(0))
.withExecutorService(executorService)
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Loads the coordinator. Poll once to execute the load operation and
once
@@ -5643,6 +5690,7 @@ public class CoordinatorRuntimeTest {
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
+ .withCachedBufferMaxBytesSupplier(() ->
CACHED_BUFFER_MAX_BYTES)
.build();
// Schedule the loading.
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 365fef6eb97..51d7dbe5416 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -37,6 +37,8 @@ import
org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
import org.apache.kafka.config
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.KafkaRaftClient
@@ -99,7 +101,9 @@ object DynamicBrokerConfig {
SocketServer.ReconfigurableConfigs ++
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs ++
- Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG)
+ Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
+ GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
+ ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
private val ClusterLevelListenerConfigs =
Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs).diff(
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 6d269c23f67..8504f3705ac 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -31,6 +31,8 @@ import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric, Metrics,
MetricsReporter}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.DynamicThreadPool
@@ -1105,6 +1107,23 @@ class DynamicBrokerConfigTest {
updateReporter(classOf[MockMetricsReporter])
verifyNoMoreInteractions(telemetryPlugin)
}
+
+ @Test
+ def testCoordinatorCachedBufferMaxBytesUpdates(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, port = 8181)
+ origProps.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"2097152")
+ origProps.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"3145728")
+ val ctx = new DynamicLogConfigContext(origProps)
+ assertEquals(2 * 1024 * 1024,
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
+ assertEquals(3 * 1024 * 1024,
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+
+ val props = new Properties()
+ props.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "4194304")
+ props.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "5242880")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(4 * 1024 * 1024,
ctx.config.groupCoordinatorConfig.cachedBufferMaxBytes())
+ assertEquals(5 * 1024 * 1024,
ctx.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+ }
}
class TestDynamicThreadPool extends BrokerReconfigurable {
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 519a7d951a3..2f13f0d2dac 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -31,7 +31,8 @@ import
org.apache.kafka.common.quota.ClientQuotaEntity.{CLIENT_ID, IP, USER}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.coordinator.group.GroupConfig
+import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -459,6 +460,54 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
}
+ @Test
+ def testDynamicGroupCoordinatorConfigChange(): Unit = {
+ val newCachedBufferMaxBytes = 2 * 1024 * 1024
+ val brokerId: String = this.brokers.head.config.brokerId.toString
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+ val op = new AlterConfigOp(
+ new ConfigEntry(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
newCachedBufferMaxBytes.toString),
+ OpType.SET
+ )
+ admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all.get
+ } finally {
+ admin.close()
+ }
+
+ for (b <- this.brokers) {
+ val value = if (b.config.brokerId.toString == brokerId)
newCachedBufferMaxBytes else
GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_DEFAULT
+ TestUtils.retry(10000) {
+ assertEquals(value,
b.config.groupCoordinatorConfig.cachedBufferMaxBytes())
+ }
+ }
+ }
+
+ @Test
+ def testDynamicShareCoordinatorConfigChange(): Unit = {
+ val newCachedBufferMaxBytes = 2 * 1024 * 1024
+ val brokerId: String = this.brokers.head.config.brokerId.toString
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+ val op = new AlterConfigOp(
+ new ConfigEntry(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
newCachedBufferMaxBytes.toString),
+ OpType.SET
+ )
+ admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all.get
+ } finally {
+ admin.close()
+ }
+
+ for (b <- this.brokers) {
+ val value = if (b.config.brokerId.toString == brokerId)
newCachedBufferMaxBytes else
ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_DEFAULT
+ TestUtils.retry(10000) {
+ assertEquals(value,
b.config.shareCoordinatorConfig.shareCoordinatorCachedBufferMaxBytes())
+ }
+ }
+ }
+
private def createAdminClient(): Admin = {
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index dd80b50f9c2..4ff67c6692d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1032,6 +1032,7 @@ class KafkaConfigTest {
/** New group coordinator configs */
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
+ case GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, 512 * 1024 -
1)
/** Consumer groups configs */
case GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git a/docs/ops.html b/docs/ops.html
index 10eba0aca9f..df0b124c17a 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1937,6 +1937,16 @@ The following set of metrics are available for
monitoring the group coordinator:
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-rate</td>
<td>The number of batches flushed per second</td>
</tr>
+ <tr>
+ <td>Batch Buffer Cache Size</td>
+
<td>kafka.server:type=group-coordinator-metrics,name=batch-buffer-cache-size-bytes</td>
+ <td>The total size in bytes of append buffers currently held in the
coordinator's cache</td>
+ </tr>
+ <tr>
+ <td>Batch Buffer Cache Discard Count</td>
+
<td>kafka.server:type=group-coordinator-metrics,name=batch-buffer-cache-discard-count</td>
+ <td>The total number of over-sized append buffers that were discarded
upon release</td>
+ </tr>
<tr>
<td>Group Count, per group type</td>
<td>kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic}</td>
@@ -4180,6 +4190,16 @@ customized state stores; for built-in state stores,
currently we have:
<td>kafka.server:type=share-coordinator-metrics,name=last-pruned-offset,topic=([-.\w]+),partition=([0-9]+)</td>
<td>The offset at which the share-group state topic was last pruned.</td>
</tr>
+ <tr>
+ <td>batch-buffer-cache-size-bytes</td>
+
<td>kafka.server:type=share-coordinator-metrics,name=batch-buffer-cache-size-bytes</td>
+ <td>The total size in bytes of append buffers currently held in the
share coordinator's cache</td>
+ </tr>
+ <tr>
+ <td>batch-buffer-cache-discard-count</td>
+
<td>kafka.server:type=share-coordinator-metrics,name=batch-buffer-cache-discard-count</td>
+ <td>The total number of over-sized append buffers that were discarded
upon release</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 4c807382f0f..1c0aea9e53a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,19 @@
<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_4_3_0" href="#upgrade_4_3_0">Upgrading to 4.3.0</a></h4>
+
+<h5><a id="upgrade_4_3_0_from" href="#upgrade_4_3_0_from">Upgrading Servers to
4.3.0 from any version 3.3.x through 4.2.0</a></h5>
+
+<h5><a id="upgrade_430_notable" href="#upgrade_430_notable">Notable changes in
4.3.0</a></h5>
+<ul>
+ <li>
+ Two new configs have been introduced:
<code>group.coordinator.cached.buffer.max.bytes</code> and
<code>share.coordinator.cached.buffer.max.bytes</code>.
+ They allow the respective coordinators to set the maximum buffer size
retained for reuse.
+ For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/hA5JFg">KIP-1196</a>.
+ </li>
+</ul>
+
<h4><a id="upgrade_4_2_0" href="#upgrade_4_2_0">Upgrading to 4.2.0</a></h4>
<h5><a id="upgrade_4_2_0_from" href="#upgrade_4_2_0_from">Upgrading Servers to
4.2.0 from any version 3.3.x through 4.1.x</a></h5>
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 052f89023b3..a6c070795c5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
@@ -37,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -109,6 +111,12 @@ public class GroupCoordinatorConfig {
public static final CompressionType
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC =
"Compression codec for the offsets topic - compression may be used to achieve
\"atomic\" commits.";
+ public static final String CACHED_BUFFER_MAX_BYTES_CONFIG =
"group.coordinator.cached.buffer.max.bytes";
+ public static final int CACHED_BUFFER_MAX_BYTES_DEFAULT = 1024 * 1024 +
Records.LOG_OVERHEAD;
+ public static final String CACHED_BUFFER_MAX_BYTES_DOC = "The maximum
buffer size that the GroupCoordinator will retain for reuse. " +
+ "Note: Setting this larger than the maximum message size is not
recommended. In this case, every write buffer will be eligible " +
+ "for recycling, which renders this configuration ineffective as a size
limit.";
+
///
/// Offset configs
///
@@ -300,6 +308,10 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT =
30_000;
public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC =
"Time elapsed before retrying initialize share group state request. If below
offsets.commit.timeout.ms, then value of offsets.commit.timeout.ms is used.";
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+ CACHED_BUFFER_MAX_BYTES_CONFIG
+ );
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
// Group coordinator configs
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
@@ -312,6 +324,9 @@ public class GroupCoordinatorConfig {
.define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT,
OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_PARTITIONS_DOC)
.define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
+ // The minimum size is set equal to `INITIAL_BUFFER_SIZE` to prevent
CACHED_BUFFER_MAX_BYTES from being configured too small,
+ // which could otherwise negatively impact performance.
+ .define(CACHED_BUFFER_MAX_BYTES_CONFIG, INT,
CACHED_BUFFER_MAX_BYTES_DEFAULT, atLeast(512 * 1024), MEDIUM,
CACHED_BUFFER_MAX_BYTES_DOC)
// Offset configs
.define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT,
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
@@ -413,6 +428,8 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxStandbyReplicas;
private final int streamsGroupInitialRebalanceDelayMs;
+ private final AbstractConfig config;
+
@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
this.numThreads =
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
@@ -465,6 +482,7 @@ public class GroupCoordinatorConfig {
this.streamsGroupNumStandbyReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
this.streamsGroupInitialRebalanceDelayMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+ this.config = config;
// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >=
consumerGroupMinHeartbeatIntervalMs,
@@ -708,6 +726,15 @@ public class GroupCoordinatorConfig {
return offsetMetadataMaxSize;
}
+ /**
+ * The maximum buffer size that the coordinator can cache.
+ *
+ * Note: On hot paths, frequent calls to this method may cause performance
bottlenecks due to synchronization overhead.
+ */
+ public int cachedBufferMaxBytes() {
+ return
config.getInt(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG);
+ }
+
/**
* The classic group maximum size.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9c29183ca38..83f1b0ca5e0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -275,6 +275,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
.withCompression(Compression.of(config.offsetTopicCompressionType()).build())
.withAppendLingerMs(config.appendLingerMs())
.withExecutorService(Executors.newSingleThreadExecutor())
+
.withCachedBufferMaxBytesSupplier(config::cachedBufferMaxBytes)
.build();
return new GroupCoordinatorService(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index be4db2483cb..1e6dcce6a85 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -200,6 +200,7 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
222);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
15 * 60 * 1000);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
5000);
+ configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 2 *
1024 * 1024);
GroupCoordinatorConfig config = createConfig(configs);
@@ -230,6 +231,7 @@ public class GroupCoordinatorConfigTest {
assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
assertEquals(15 * 60 * 1000,
config.consumerGroupRegexRefreshIntervalMs());
assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
+ assertEquals(2 * 1024 * 1024, config.cachedBufferMaxBytes());
}
@Test
@@ -375,6 +377,7 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
5);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 1000);
+ configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
1024 * 1024);
return createConfig(configs);
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
index 98e0a2a5f25..7b6bbb1dc5c 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java
@@ -19,10 +19,12 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Utils;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -82,6 +84,16 @@ public class ShareCoordinatorConfig {
public static final int COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT = 5 *
60 * 1000; // 5 minutes
public static final String COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC = "The
duration in milliseconds that the share coordinator will wait between force
snapshotting share partitions which are not being updated.";
+ public static final String CACHED_BUFFER_MAX_BYTES_CONFIG =
"share.coordinator.cached.buffer.max.bytes";
+ public static final int CACHED_BUFFER_MAX_BYTES_DEFAULT = 1024 * 1024 +
Records.LOG_OVERHEAD;
+ public static final String CACHED_BUFFER_MAX_BYTES_DOC = "The maximum
buffer size that the ShareCoordinator will retain for reuse. " +
+ "Note: Setting this larger than the maximum message size is not
recommended. In this case, every write buffer will be eligible " +
+ "for recycling, which renders this configuration ineffective as a size
limit.";
+
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+ CACHED_BUFFER_MAX_BYTES_CONFIG
+ );
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(STATE_TOPIC_NUM_PARTITIONS_CONFIG, INT,
STATE_TOPIC_NUM_PARTITIONS_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_NUM_PARTITIONS_DOC)
.define(STATE_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
STATE_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
STATE_TOPIC_REPLICATION_FACTOR_DOC)
@@ -94,7 +106,10 @@ public class ShareCoordinatorConfig {
.define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT,
atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC)
.define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT,
atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC)
.defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT,
STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
STATE_TOPIC_PRUNE_INTERVAL_MS_DOC)
- .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT,
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC);
+ .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT,
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC)
+ // The minimum size is set equal to `INITIAL_BUFFER_SIZE` to prevent
CACHED_BUFFER_MAX_BYTES from being configured too small,
+ // which could otherwise negatively impact performance.
+ .define(CACHED_BUFFER_MAX_BYTES_CONFIG, INT,
CACHED_BUFFER_MAX_BYTES_DEFAULT, atLeast(512 * 1024), MEDIUM,
CACHED_BUFFER_MAX_BYTES_DOC);
private final int stateTopicNumPartitions;
private final short stateTopicReplicationFactor;
@@ -109,6 +124,8 @@ public class ShareCoordinatorConfig {
private final int pruneIntervalMs;
private final int coldPartitionSnapshotIntervalMs;
+ private final AbstractConfig config;
+
public ShareCoordinatorConfig(AbstractConfig config) {
stateTopicNumPartitions =
config.getInt(STATE_TOPIC_NUM_PARTITIONS_CONFIG);
stateTopicReplicationFactor =
config.getShort(STATE_TOPIC_REPLICATION_FACTOR_CONFIG);
@@ -124,6 +141,7 @@ public class ShareCoordinatorConfig {
appendLingerMs = config.getInt(APPEND_LINGER_MS_CONFIG);
pruneIntervalMs = config.getInt(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG);
coldPartitionSnapshotIntervalMs =
config.getInt(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG);
+ this.config = config;
validate();
}
@@ -182,6 +200,15 @@ public class ShareCoordinatorConfig {
public int shareCoordinatorColdPartitionSnapshotIntervalMs() {
return coldPartitionSnapshotIntervalMs;
}
+
+ /**
+ * The maximum buffer size that the share coordinator can cache.
+ *
+ * Note: On hot paths, frequent calls to this method may cause performance
bottlenecks due to synchronization overhead.
+ */
+ public int shareCoordinatorCachedBufferMaxBytes() {
+ return config.getInt(CACHED_BUFFER_MAX_BYTES_CONFIG);
+ }
private void validate() {
Utils.require(snapshotUpdateRecordsPerSnapshot >= 0 &&
snapshotUpdateRecordsPerSnapshot <= 500,
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 19d198b8de4..8a90cf9742f 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -208,6 +208,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
.withCompression(Compression.of(config.shareCoordinatorStateTopicCompressionType()).build())
.withAppendLingerMs(config.shareCoordinatorAppendLingerMs())
.withExecutorService(Executors.newSingleThreadExecutor())
+
.withCachedBufferMaxBytesSupplier(config::shareCoordinatorCachedBufferMaxBytes)
.build();
return new ShareCoordinatorService(
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index 853bc119432..e3f885c8fb6 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -51,6 +51,7 @@ public class ShareCoordinatorTestConfig {
configs.put(ShareCoordinatorConfig.STATE_TOPIC_COMPRESSION_CODEC_CONFIG,
String.valueOf(CompressionType.NONE.id));
configs.put(ShareCoordinatorConfig.STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG,
"30000"); // 30 seconds
configs.put(ShareCoordinatorConfig.COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG,
"10000"); // 10 seconds
+ configs.put(ShareCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"1048576"); // 1024 * 1024
return configs;
}