This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7ee1b425328 Add config to set consumption rate limit at partition
level (#18904)
7ee1b425328 is described below
commit 7ee1b425328956c058c6bf7a57eea9e2b9f12061
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jul 1 17:00:23 2026 -0700
Add config to set consumption rate limit at partition level (#18904)
---
.../realtime/RealtimeConsumptionRateManager.java | 36 +++++----
.../RealtimeConsumptionRateManagerTest.java | 51 +++++++-----
.../org/apache/pinot/spi/stream/StreamConfig.java | 90 +++++++++++++---------
.../pinot/spi/stream/StreamConfigProperties.java | 3 +-
4 files changed, 109 insertions(+), 71 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index b050122c03c..56d54ffdd3b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -48,17 +48,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * This class is responsible for creating realtime consumption rate limiters.
- * It contains one rate limiter for the entire server and multiple table
partition level rate limiters.
- * Server rate limiter is used to throttle the overall consumption rate of the
server and configured via
- * cluster or server config.
- * For table partition level rate limiter, the rate limit value specified in
StreamConfig of table config, is for the
- * entire topic. The effective rate limit for each partition is simply the
specified rate limit divided by the
- * partition count.
- * This class leverages a cache for storing partition count for different
topics as retrieving partition count from
- * stream is a bit expensive and also the same count will be used of all
partition consumers of the same topic.
- */
+/// This class is responsible for creating realtime consumption rate limiters.
+/// It contains one rate limiter for the entire server and multiple table
partition level rate limiters.
+/// Server rate limiter is used to throttle the overall consumption rate of
the server and configured via cluster or
+/// server config.
+/// For table partition level rate limiter, the rate limit can be specified in
[StreamConfig] of table config in two
+/// ways:
+/// - Partition level rate limit: directly used as the rate limit for each
partition. It doesn't need to be adjusted
+/// when the partition count of the topic changes.
+/// - Topic level rate limit: the rate limit for the entire topic. The
effective rate limit for each partition is the
+/// specified rate limit divided by the partition count. When both are
specified, partition level rate limit takes
+/// precedence.
+/// This class leverages a cache for storing partition count for different
topics as retrieving partition count from
+/// stream is a bit expensive and also the same count will be used of all
partition consumers of the same topic.
public class RealtimeConsumptionRateManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeConsumptionRateManager.class);
private static final int CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES = 10;
@@ -171,7 +173,14 @@ public class RealtimeConsumptionRateManager {
public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig,
String tableName,
ServerMetrics serverMetrics, String metricKeyName) {
- if (streamConfig.getTopicConsumptionRateLimit().isEmpty()) {
+ double partitionRateLimit =
streamConfig.getPartitionConsumptionRateLimit();
+ if (partitionRateLimit > 0) {
+ LOGGER.info("A consumption rate limiter is set up for topic {} in table
{} with partition rate limit: {}",
+ streamConfig.getTopicName(), tableName, partitionRateLimit);
+ return new PartitionRateLimiter(partitionRateLimit, serverMetrics,
metricKeyName);
+ }
+ double topicRateLimit = streamConfig.getTopicConsumptionRateLimit();
+ if (topicRateLimit <= 0) {
return NOOP_RATE_LIMITER;
}
int partitionCount;
@@ -181,8 +190,7 @@ public class RealtimeConsumptionRateManager {
// Exception here means for some reason, partition count cannot be
fetched from stream!
throw new RuntimeException(e);
}
- double topicRateLimit = streamConfig.getTopicConsumptionRateLimit().get();
- double partitionRateLimit = topicRateLimit / partitionCount;
+ partitionRateLimit = topicRateLimit / partitionCount;
LOGGER.info("A consumption rate limiter is set up for topic {} in table {}
with rate limit: {} "
+ "(topic rate limit: {}, partition count: {})",
streamConfig.getTopicName(), tableName, partitionRateLimit,
topicRateLimit, partitionCount);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
index 3c56c717fbe..bcc40a7d9ca 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
@@ -23,7 +23,6 @@ import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Arrays;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -45,17 +44,20 @@ import static org.testng.Assert.assertEquals;
public class RealtimeConsumptionRateManagerTest {
private static final int NUM_PARTITIONS_TOPIC_A = 10;
private static final int NUM_PARTITIONS_TOPIC_B = 20;
- private static final Double RATE_LIMIT_FOR_ENTIRE_TOPIC = 50.0;
+ private static final double RATE_LIMIT_FOR_PARTITION = 4.0;
+ private static final double RATE_LIMIT_FOR_ENTIRE_TOPIC = 50.0;
private static final String TABLE_NAME = "table-XYZ";
private static final double DELTA = 0.0001;
private static final StreamConfig STREAM_CONFIG_A = mock(StreamConfig.class);
private static final StreamConfig STREAM_CONFIG_B = mock(StreamConfig.class);
private static final StreamConfig STREAM_CONFIG_C = mock(StreamConfig.class);
+ private static final StreamConfig STREAM_CONFIG_D = mock(StreamConfig.class);
+ private static final StreamConfig STREAM_CONFIG_E = mock(StreamConfig.class);
private static final PinotConfiguration SERVER_CONFIG_1 =
mock(PinotConfiguration.class);
private static final PinotConfiguration SERVER_CONFIG_2 =
mock(PinotConfiguration.class);
private static final PinotConfiguration SERVER_CONFIG_3 =
mock(PinotConfiguration.class);
private static final PinotConfiguration SERVER_CONFIG_4 =
mock(PinotConfiguration.class);
- private static RealtimeConsumptionRateManager _consumptionRateManager;
+ private static final RealtimeConsumptionRateManager CONSUMPTION_RATE_MANAGER;
static {
LoadingCache<StreamConfig, Integer> cache = mock(LoadingCache.class);
@@ -63,12 +65,15 @@ public class RealtimeConsumptionRateManagerTest {
when(cache.get(STREAM_CONFIG_A)).thenReturn(NUM_PARTITIONS_TOPIC_A);
when(cache.get(STREAM_CONFIG_B)).thenReturn(NUM_PARTITIONS_TOPIC_B);
} catch (ExecutionException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
-
when(STREAM_CONFIG_A.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
-
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
-
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty());
- _consumptionRateManager = new RealtimeConsumptionRateManager(cache);
+
when(STREAM_CONFIG_A.getTopicConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_ENTIRE_TOPIC);
+
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_ENTIRE_TOPIC);
+
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(StreamConfig.CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED);
+
when(STREAM_CONFIG_D.getPartitionConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_PARTITION);
+
when(STREAM_CONFIG_E.getPartitionConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_PARTITION);
+
when(STREAM_CONFIG_E.getTopicConsumptionRateLimit()).thenReturn(RATE_LIMIT_FOR_ENTIRE_TOPIC);
+ CONSUMPTION_RATE_MANAGER = new RealtimeConsumptionRateManager(cache);
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
@@ -91,22 +96,30 @@ public class RealtimeConsumptionRateManagerTest {
@Test
public void testCreateRateLimiter() {
// topic A
- ConsumptionRateLimiter rateLimiter =
_consumptionRateManager.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
- assertEquals(5.0, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+ ConsumptionRateLimiter rateLimiter =
CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 5.0, DELTA);
// topic B
- rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_B,
TABLE_NAME);
- assertEquals(2.5, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_B,
TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 2.5, DELTA);
// topic C
- rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C,
TABLE_NAME);
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_C,
TABLE_NAME);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+
+ // topic D: partition level rate limit is used directly, without fetching
the partition count
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_D,
TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
+
+ // topic E: partition level rate limit takes precedence over topic level
rate limit
+ rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_E,
TABLE_NAME);
+ assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
}
@Test
public void testCreateServerRateLimiter() {
// Server config 1
- ConsumptionRateLimiter rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_1, null);
+ ConsumptionRateLimiter rateLimiter =
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_1, null);
ServerRateLimiter serverRateLimiter = (ServerRateLimiter) rateLimiter;
try {
assertEquals(serverRateLimiter.getRate(), 5.0, DELTA);
@@ -116,7 +129,7 @@ public class RealtimeConsumptionRateManagerTest {
}
// Server config 2
- serverRateLimiter = (ServerRateLimiter)
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_2, null);
+ serverRateLimiter = (ServerRateLimiter)
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_2, null);
try {
assertEquals(((ServerRateLimiter) rateLimiter).getRate(), 2.5, DELTA);
assertEquals(serverRateLimiter.getRate(), 2.5, DELTA);
@@ -125,16 +138,16 @@ public class RealtimeConsumptionRateManagerTest {
}
// Server config 3
- rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_3, null);
+ rateLimiter =
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_3, null);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);
// Server config 4
- rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_4, null);
+ rateLimiter =
CONSUMPTION_RATE_MANAGER.createServerRateLimiter(SERVER_CONFIG_4, null);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);
ServerRateLimitConfig serverRateLimitConfig = new ServerRateLimitConfig(1,
MessageCountThrottlingStrategy.INSTANCE);
- _consumptionRateManager.updateServerRateLimiter(serverRateLimitConfig,
null);
- serverRateLimiter = (ServerRateLimiter)
_consumptionRateManager.getServerRateLimiter();
+ CONSUMPTION_RATE_MANAGER.updateServerRateLimiter(serverRateLimitConfig,
null);
+ serverRateLimiter = (ServerRateLimiter)
CONSUMPTION_RATE_MANAGER.getServerRateLimiter();
try {
assertEquals(serverRateLimiter.getRate(), 1);
assertEquals(serverRateLimiter.getMetricEmitter().getRate(), 1);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 7f502fd23f3..60e6b1d025f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.DataSizeUtils;
@@ -50,7 +49,7 @@ public class StreamConfig {
public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
public static final int DEFAULT_IDLE_TIMEOUT_MILLIS = 3 * 60 * 1000;
- private static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1;
+ public static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1;
private final String _type;
private final String _topicName;
@@ -71,8 +70,7 @@ public class StreamConfig {
private final double _flushThresholdVarianceFraction;
private final int _flushAutotuneInitialRows; // initial num rows to use for
SegmentSizeBasedFlushThresholdUpdater
- private final String _groupId;
-
+ private final double _partitionConsumptionRateLimit;
private final double _topicConsumptionRateLimit;
private final boolean _enableOffsetAutoReset;
@@ -198,11 +196,13 @@ public class StreamConfig {
}
_flushAutotuneInitialRows = autotuneInitialRows > 0 ? autotuneInitialRows
: DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
- String groupIdKey = StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.GROUP_ID);
- _groupId = streamConfigMap.get(groupIdKey);
+ String partitionRate =
streamConfigMap.get(StreamConfigProperties.PARTITION_CONSUMPTION_RATE_LIMIT);
+ _partitionConsumptionRateLimit =
+ partitionRate != null ? Double.parseDouble(partitionRate) :
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
- String rate =
streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT);
- _topicConsumptionRateLimit = rate != null ? Double.parseDouble(rate) :
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
+ String topicRate =
streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT);
+ _topicConsumptionRateLimit =
+ topicRate != null ? Double.parseDouble(topicRate) :
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
_enableOffsetAutoReset =
Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET));
_offsetAutoResetOffsetThreshold =
parseOffsetAutoResetOffsetThreshold(streamConfigMap);
@@ -413,13 +413,14 @@ public class StreamConfig {
return _flushAutotuneInitialRows;
}
- public String getGroupId() {
- return _groupId;
+ /// Returns the partition level consumption rate limit. Non-positive value
means consumption is not throttled.
+ public double getPartitionConsumptionRateLimit() {
+ return _partitionConsumptionRateLimit;
}
- public Optional<Double> getTopicConsumptionRateLimit() {
- return _topicConsumptionRateLimit == CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED
? Optional.empty()
- : Optional.of(_topicConsumptionRateLimit);
+ /// Returns the topic level consumption rate limit. Non-positive value means
consumption is not throttled.
+ public double getTopicConsumptionRateLimit() {
+ return _topicConsumptionRateLimit;
}
public boolean isEnableOffsetAutoReset() {
@@ -448,22 +449,32 @@ public class StreamConfig {
@Override
public String toString() {
- return "StreamConfig{" + "_type='" + _type + '\'' + ", _topicName='" +
_topicName + '\'' + ", _tableNameWithType='"
- + _tableNameWithType + '\'' + ", _consumerFactoryClassName='" +
_consumerFactoryClassName + '\''
- + ", _decoderClass='" + _decoderClass + '\'' + ", _decoderProperties="
+ _decoderProperties
- + ", _connectionTimeoutMillis=" + _connectionTimeoutMillis + ",
_fetchTimeoutMillis=" + _fetchTimeoutMillis
- + ", _idleTimeoutMillis=" + _idleTimeoutMillis + ",
_flushThresholdRows=" + _flushThresholdRows
- + ", _flushThresholdSegmentRows=" + _flushThresholdSegmentRows + ",
_flushThresholdTimeMillis="
- + _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" +
_flushThresholdSegmentSizeBytes
+ return "StreamConfig{"
+ + "_type='" + _type + '\''
+ + ", _topicName='" + _topicName + '\''
+ + ", _tableNameWithType='" + _tableNameWithType + '\''
+ + ", _consumerFactoryClassName='" + _consumerFactoryClassName + '\''
+ + ", _decoderClass='" + _decoderClass + '\''
+ + ", _decoderProperties=" + _decoderProperties
+ + ", _connectionTimeoutMillis=" + _connectionTimeoutMillis
+ + ", _fetchTimeoutMillis=" + _fetchTimeoutMillis
+ + ", _idleTimeoutMillis=" + _idleTimeoutMillis
+ + ", _flushThresholdRows=" + _flushThresholdRows
+ + ", _flushThresholdSegmentRows=" + _flushThresholdSegmentRows
+ + ", _flushThresholdTimeMillis=" + _flushThresholdTimeMillis
+ + ", _flushThresholdSegmentSizeBytes=" +
_flushThresholdSegmentSizeBytes
+ ", _flushThresholdVarianceFraction=" +
_flushThresholdVarianceFraction
- + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ",
_groupId='" + _groupId + '\''
+ + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows
+ + ", _partitionConsumptionRateLimit=" + _partitionConsumptionRateLimit
+ ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit
+ ", _enableOffsetAutoReset=" + _enableOffsetAutoReset
- + ", _offsetAutoResetOffsetThreshold" + _offsetAutoResetOffsetThreshold
- + ", _offSetAutoResetTimeSecThreshold" +
_offsetAutoResetTimeSecThreshold
+ + ", _offsetAutoResetOffsetThreshold=" +
_offsetAutoResetOffsetThreshold
+ + ", _offsetAutoResetTimeSecThreshold=" +
_offsetAutoResetTimeSecThreshold
+ ", _backfillTopic=" + _backfillTopic
+ ", _streamConfigMap=" + _streamConfigMap
- + ", _offsetCriteria=" + _offsetCriteria + ",
_serverUploadToDeepStore=" + _serverUploadToDeepStore + '}';
+ + ", _offsetCriteria=" + _offsetCriteria
+ + ", _serverUploadToDeepStore=" + _serverUploadToDeepStore
+ + '}';
}
@Override
@@ -475,23 +486,30 @@ public class StreamConfig {
return false;
}
StreamConfig that = (StreamConfig) o;
- return _connectionTimeoutMillis == that._connectionTimeoutMillis &&
_fetchTimeoutMillis == that._fetchTimeoutMillis
- && _idleTimeoutMillis == that._idleTimeoutMillis &&
_flushThresholdRows == that._flushThresholdRows
+ return Objects.equals(_type, that._type)
+ && Objects.equals(_topicName, that._topicName)
+ && Objects.equals(_tableNameWithType, that._tableNameWithType)
+ && Objects.equals(_consumerFactoryClassName,
that._consumerFactoryClassName)
+ && Objects.equals(_decoderClass, that._decoderClass)
+ && Objects.equals(_decoderProperties, that._decoderProperties)
+ && _connectionTimeoutMillis == that._connectionTimeoutMillis
+ && _fetchTimeoutMillis == that._fetchTimeoutMillis
+ && _idleTimeoutMillis == that._idleTimeoutMillis
+ && _flushThresholdRows == that._flushThresholdRows
&& _flushThresholdSegmentRows == that._flushThresholdSegmentRows
&& _flushThresholdTimeMillis == that._flushThresholdTimeMillis
&& _flushThresholdSegmentSizeBytes ==
that._flushThresholdSegmentSizeBytes
+ && Double.compare(_flushThresholdVarianceFraction,
that._flushThresholdVarianceFraction) == 0
&& _flushAutotuneInitialRows == that._flushAutotuneInitialRows
+ && Double.compare(_partitionConsumptionRateLimit,
that._partitionConsumptionRateLimit) == 0
&& Double.compare(_topicConsumptionRateLimit,
that._topicConsumptionRateLimit) == 0
- && Objects.equals(_serverUploadToDeepStore,
that._serverUploadToDeepStore) && Objects.equals(_type, that._type)
- && Objects.equals(_topicName, that._topicName) &&
Objects.equals(_tableNameWithType, that._tableNameWithType)
- && Objects.equals(_consumerFactoryClassName,
that._consumerFactoryClassName) && Objects.equals(_decoderClass,
- that._decoderClass) && Objects.equals(_decoderProperties,
that._decoderProperties) && Objects.equals(_groupId,
- that._groupId) && Objects.equals(_streamConfigMap,
that._streamConfigMap) && Objects.equals(_offsetCriteria,
- that._offsetCriteria) &&
Objects.equals(_flushThresholdVarianceFraction,
that._flushThresholdVarianceFraction)
&& _enableOffsetAutoReset == that._enableOffsetAutoReset
&& _offsetAutoResetOffsetThreshold ==
that._offsetAutoResetOffsetThreshold
&& _offsetAutoResetTimeSecThreshold ==
that._offsetAutoResetTimeSecThreshold
- && Objects.equals(_backfillTopic, that._backfillTopic);
+ && Objects.equals(_backfillTopic, that._backfillTopic)
+ && Objects.equals(_streamConfigMap, that._streamConfigMap)
+ && Objects.equals(_offsetCriteria, that._offsetCriteria)
+ && Objects.equals(_serverUploadToDeepStore,
that._serverUploadToDeepStore);
}
@Override
@@ -499,8 +517,8 @@ public class StreamConfig {
return Objects.hash(_type, _topicName, _tableNameWithType,
_consumerFactoryClassName, _decoderClass,
_decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis,
_idleTimeoutMillis, _flushThresholdRows,
_flushThresholdSegmentRows, _flushThresholdTimeMillis,
_flushThresholdSegmentSizeBytes,
- _flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit,
_streamConfigMap, _offsetCriteria,
- _serverUploadToDeepStore, _flushThresholdVarianceFraction,
_offsetAutoResetOffsetThreshold,
- _enableOffsetAutoReset, _offsetAutoResetTimeSecThreshold,
_backfillTopic);
+ _flushThresholdVarianceFraction, _flushAutotuneInitialRows,
_partitionConsumptionRateLimit,
+ _topicConsumptionRateLimit, _enableOffsetAutoReset,
_offsetAutoResetOffsetThreshold,
+ _offsetAutoResetTimeSecThreshold, _backfillTopic, _streamConfigMap,
_offsetCriteria, _serverUploadToDeepStore);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index aa00ceb8870..aefa0e9093d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -45,8 +45,7 @@ public class StreamConfigProperties {
public static final String STREAM_IDLE_TIMEOUT_MILLIS =
"idle.timeout.millis";
public static final String STREAM_DECODER_CLASS = "decoder.class.name";
public static final String DECODER_PROPS_PREFIX = "decoder.prop";
- public static final String GROUP_ID = "hlc.group.id";
- public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS =
"partition.offset.factory.class.name";
+ public static final String PARTITION_CONSUMPTION_RATE_LIMIT =
"partition.consumption.rate.limit";
public static final String TOPIC_CONSUMPTION_RATE_LIMIT =
"topic.consumption.rate.limit";
public static final String METADATA_POPULATE = "metadata.populate";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]