This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bcdb053af3 Calculate size based flush threshold per topic (#14765)
bcdb053af3 is described below
commit bcdb053af3319b191348caa0fb095561344eb0e6
Author: Christopher Peck <[email protected]>
AuthorDate: Fri Feb 7 13:56:11 2025 -0800
Calculate size based flush threshold per topic (#14765)
* calculate size based flush threshold per topic
* emit duplicate metrics for backwards compatibility
---
.../pinot/common/metrics/ControllerGauge.java | 4 ++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 3 +--
.../segment/FlushThresholdUpdateManager.java | 28 +++++++++++++++-------
.../SegmentSizeBasedFlushThresholdUpdater.java | 11 ++++++++-
.../segment/FlushThresholdUpdaterTest.java | 20 +++++++++-------
5 files changed, 45 insertions(+), 21 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 99bd892066..475f128208 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -174,11 +174,15 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// any partition in the realtime table. This metric is emitted from the
segment size based threshold
// computer.
NUM_ROWS_THRESHOLD("numRowsThreshold", false),
+ // Added to preserve backwards compatibility of the above metric
+ NUM_ROWS_THRESHOLD_WITH_TOPIC("numRowsThresholdWithTopic", false),
// The actual segment size for committing segments. These may be shorter
than expected when the administrator
// issues a force-commit, or zero when new partitions are detected in the
stream (since there is no completing
// segment when the partition is first detected).
COMMITTING_SEGMENT_SIZE("committingSegmentSize", false),
+ // Added to preserve backwards compatibility of the above metric
+ COMMITTING_SEGMENT_SIZE_WITH_TOPIC("committingSegmentSizeWithTopic", false),
TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index ce11aae1c0..e296136975 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -326,11 +326,10 @@ public class PinotLLCRealtimeSegmentManager {
String realtimeTableName = tableConfig.getTableName();
LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
- _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
-
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigMaps(tableConfig).stream().map(
streamConfig -> new StreamConfig(tableConfig.getTableName(),
streamConfig)
).collect(Collectors.toList());
+
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index e72971c5e0..b4476e3bac 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pinot.spi.stream.StreamConfig;
/**
- * Manager which maintains the flush threshold update objects for each table
+ * Manager which maintains the flush threshold update objects for each (table,
topic) pair
*/
public class FlushThresholdUpdateManager {
private final ConcurrentMap<String, FlushThresholdUpdater>
_flushThresholdUpdaterMap = new ConcurrentHashMap<>();
@@ -45,30 +46,39 @@ public class FlushThresholdUpdateManager {
* partitions consumed by a server; FixedFlushThresholdUpdater sets the
actual segment flush threshold as is.
*/
public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig
streamConfig) {
+ String tableTopicKey = getKey(streamConfig);
String realtimeTableName = streamConfig.getTableNameWithType();
-
int flushThresholdRows = streamConfig.getFlushThresholdRows();
if (flushThresholdRows > 0) {
- _flushThresholdUpdaterMap.remove(realtimeTableName);
+ _flushThresholdUpdaterMap.remove(tableTopicKey);
return new DefaultFlushThresholdUpdater(flushThresholdRows);
}
int flushThresholdSegmentRows =
streamConfig.getFlushThresholdSegmentRows();
if (flushThresholdSegmentRows > 0) {
- _flushThresholdUpdaterMap.remove(realtimeTableName);
+ _flushThresholdUpdaterMap.remove(tableTopicKey);
return new FixedFlushThresholdUpdater(flushThresholdSegmentRows);
}
// Legacy behavior: when flush threshold rows is explicitly set to 0, use
segment size based flush threshold
long flushThresholdSegmentSizeBytes =
streamConfig.getFlushThresholdSegmentSizeBytes();
if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
- return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName,
- k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName));
+ return _flushThresholdUpdaterMap.computeIfAbsent(tableTopicKey,
+ k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName,
streamConfig.getTopicName()));
} else {
- _flushThresholdUpdaterMap.remove(realtimeTableName);
+ _flushThresholdUpdaterMap.remove(tableTopicKey);
return new
DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
}
}
- public void clearFlushThresholdUpdater(String realtimeTableName) {
- _flushThresholdUpdaterMap.remove(realtimeTableName);
+ public void clearFlushThresholdUpdater(StreamConfig streamConfig) {
+ _flushThresholdUpdaterMap.remove(getKey(streamConfig));
+ }
+
+ private String getKey(StreamConfig streamConfig) {
+ return streamConfig.getTableNameWithType() + "," +
streamConfig.getTopicName();
+ }
+
+ @VisibleForTesting
+ public int getFlushThresholdUpdaterMapSize() {
+ return _flushThresholdUpdaterMap.size();
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 717a95bde3..1d9d3bd90d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -39,12 +39,14 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
public static final Logger LOGGER =
LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
private final SegmentFlushThresholdComputer _flushThresholdComputer;
private final String _realtimeTableName;
+ private final String _topicName;
private final ControllerMetrics _controllerMetrics = ControllerMetrics.get();
- public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) {
+ public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName,
String topicName) {
_flushThresholdComputer = new SegmentFlushThresholdComputer();
_realtimeTableName = realtimeTableName;
+ _topicName = topicName;
}
// synchronized since this method could be called for multiple partitions of
the same table in different threads
@@ -57,8 +59,15 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
newSegmentZKMetadata.getSegmentName());
newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);
+ // metrics tagged with table only
_controllerMetrics.setOrUpdateTableGauge(_realtimeTableName,
ControllerGauge.NUM_ROWS_THRESHOLD, threshold);
_controllerMetrics.setOrUpdateTableGauge(_realtimeTableName,
ControllerGauge.COMMITTING_SEGMENT_SIZE,
committingSegmentDescriptor.getSegmentSizeBytes());
+
+ // metrics tagged with topic and table
+ _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName,
+ ControllerGauge.NUM_ROWS_THRESHOLD_WITH_TOPIC, threshold);
+ _controllerMetrics.setOrUpdateTableGauge(_realtimeTableName, _topicName,
+ ControllerGauge.COMMITTING_SEGMENT_SIZE_WITH_TOPIC,
committingSegmentDescriptor.getSegmentSizeBytes());
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 712f80078b..2d45753015 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -90,7 +90,9 @@ public class FlushThresholdUpdaterTest {
segmentBasedflushThresholdUpdater = flushThresholdUpdater;
// Clear the updater
-
flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
+
assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 1);
+ flushThresholdUpdateManager.clearFlushThresholdUpdater(mockStreamConfig(0,
-1, -1));
+
assertEquals(flushThresholdUpdateManager.getFlushThresholdUpdaterMapSize(), 0);
// Call again with flush threshold rows set to 0 - a different Object
should be returned
flushThresholdUpdater =
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1,
-1));
@@ -140,7 +142,7 @@ public class FlushThresholdUpdaterTest {
for (long[] segmentSizesMB :
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB,
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
+ new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME,
streamConfig.getTopicName());
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -178,7 +180,7 @@ public class FlushThresholdUpdaterTest {
for (long[] segmentSizesMB :
Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB,
LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
+ new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME,
streamConfig.getTopicName());
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
@@ -238,9 +240,9 @@ public class FlushThresholdUpdaterTest {
@Test
public void testTimeThreshold() {
- SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
+ SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+ new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME,
streamConfig.getTopicName());
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -272,9 +274,9 @@ public class FlushThresholdUpdaterTest {
@Test
public void testMinThreshold() {
- SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
- new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
+ SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
+ new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME,
streamConfig.getTopicName());
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
@@ -305,8 +307,6 @@ public class FlushThresholdUpdaterTest {
@Test
public void testSegmentSizeBasedUpdaterWithModifications() {
- SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
- = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
// Use customized stream config
long flushSegmentDesiredSizeBytes =
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;
@@ -314,6 +314,8 @@ public class FlushThresholdUpdaterTest {
int flushAutotuneInitialRows =
StreamConfig.DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS / 2;
StreamConfig streamConfig =
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes,
flushThresholdTimeMillis, flushAutotuneInitialRows);
+ SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
+ = new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME,
streamConfig.getTopicName());
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]