Copilot commented on code in PR #18904:
URL: https://github.com/apache/pinot/pull/18904#discussion_r3509414371


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -198,11 +196,13 @@ public StreamConfig(String tableNameWithType, Map<String, 
String> streamConfigMa
     }
     _flushAutotuneInitialRows = autotuneInitialRows > 0 ? autotuneInitialRows 
: DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
 
-    String groupIdKey = StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.GROUP_ID);
-    _groupId = streamConfigMap.get(groupIdKey);
+    String partitionRate = 
streamConfigMap.get(StreamConfigProperties.PARTITION_CONSUMPTION_RATE_LIMIT);
+    _partitionConsumptionRateLimit =
+        partitionRate != null ? Double.parseDouble(partitionRate) : 
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;

Review Comment:
   The new partition/topic rate limit parsing uses Double.parseDouble() without 
adding config-key context and will also accept non-finite values like 
NaN/Infinity. A malformed value would fail with an unhelpful 
NumberFormatException (or worse, silently disable throttling for NaN partition 
limits). Consider validating the value and throwing an IllegalArgumentException 
that includes the config key and value.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java:
##########
@@ -45,8 +45,7 @@ private StreamConfigProperties() {
   public static final String STREAM_IDLE_TIMEOUT_MILLIS = 
"idle.timeout.millis";
   public static final String STREAM_DECODER_CLASS = "decoder.class.name";
   public static final String DECODER_PROPS_PREFIX = "decoder.prop";
-  public static final String GROUP_ID = "hlc.group.id";
-  public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = 
"partition.offset.factory.class.name";
+  public static final String PARTITION_CONSUMPTION_RATE_LIMIT = 
"partition.consumption.rate.limit";
   public static final String TOPIC_CONSUMPTION_RATE_LIMIT = 
"topic.consumption.rate.limit";

Review Comment:
   Removing public constants from pinot-spi is a binary-incompatible change for 
third-party plugins that might reference these keys directly. Consider keeping 
the legacy keys as `@Deprecated` constants (even if Pinot no longer uses them) 
to preserve compatibility while still steering users to newer configs.



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java:
##########
@@ -91,22 +96,30 @@ public class RealtimeConsumptionRateManagerTest {
   @Test
   public void testCreateRateLimiter() {
     // topic A
-    ConsumptionRateLimiter rateLimiter = 
_consumptionRateManager.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
-    assertEquals(5.0, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+    ConsumptionRateLimiter rateLimiter = 
CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 5.0, DELTA);
 
     // topic B
-    rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_B, 
TABLE_NAME);
-    assertEquals(2.5, ((PartitionRateLimiter) rateLimiter).getRate(), DELTA);
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_B, 
TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 2.5, DELTA);
 
     // topic C
-    rateLimiter = _consumptionRateManager.createRateLimiter(STREAM_CONFIG_C, 
TABLE_NAME);
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_C, 
TABLE_NAME);
     assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+
+    // topic D: partition level rate limit is used directly, without fetching 
the partition count
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_D, 
TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);
+
+    // topic E: partition level rate limit takes precedence over topic level 
rate limit
+    rateLimiter = CONSUMPTION_RATE_MANAGER.createRateLimiter(STREAM_CONFIG_E, 
TABLE_NAME);
+    assertEquals(((PartitionRateLimiter) rateLimiter).getRate(), 4.0, DELTA);

Review Comment:
   The new partition-level rate limit tests describe that the 
cache/partition-count fetch is skipped, but they don't currently assert that 
behavior. Adding a verify(times(0)) on the cache helps prevent regressions 
where partition-level throttling accidentally triggers a partition-count lookup.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -413,13 +413,14 @@ public int getFlushAutotuneInitialRows() {
     return _flushAutotuneInitialRows;
   }
 
-  public String getGroupId() {
-    return _groupId;
+  /// Returns the partition level consumption rate limit. Non-positive value 
means consumption is not throttled.
+  public double getPartitionConsumptionRateLimit() {
+    return _partitionConsumptionRateLimit;
   }
 
-  public Optional<Double> getTopicConsumptionRateLimit() {
-    return _topicConsumptionRateLimit == CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED 
? Optional.empty()
-        : Optional.of(_topicConsumptionRateLimit);
+  /// Returns the topic level consumption rate limit. Non-positive value means 
consumption is not throttled.
+  public double getTopicConsumptionRateLimit() {
+    return _topicConsumptionRateLimit;

Review Comment:
   Changing getTopicConsumptionRateLimit() from Optional<Double> to double (and 
removing getGroupId()) in pinot-spi is a binary-incompatible API change for any 
external plugins compiled against the old signature. If the intent is to avoid 
a breaking change, consider keeping the old methods (deprecated) and 
introducing new methods with different names (e.g. 
getTopicConsumptionRateLimitValue()/getPartitionConsumptionRateLimitValue()), 
then migrate internal call sites first and remove the deprecated methods in a 
later major release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to