This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3724ba27cf Adding a consumer lag as metric via a periodic task in
controller (#9800)
3724ba27cf is described below
commit 3724ba27cf5ceeebdd9805327b7f3d29abdea47d
Author: Navina Ramesh <[email protected]>
AuthorDate: Fri Nov 18 14:14:01 2022 -0800
Adding a consumer lag as metric via a periodic task in controller (#9800)
* emit lag as a part of segment status checker
* continue
* verified realtime consumer monitor emits metrics
* added unit test; method in abstract metrics to get partition level gauge
value
* cleanup
* fix unit test
* Disable monitor by default; addressed feedback
---
.../pinot/common/metrics/AbstractMetrics.java | 19 +++
.../pinot/common/metrics/ControllerGauge.java | 7 +-
.../pinot/controller/BaseControllerStarter.java | 5 +
.../apache/pinot/controller/ControllerConf.java | 19 +++
.../controller/helix/RealtimeConsumerMonitor.java | 121 ++++++++++++++
.../util/ConsumingSegmentInfoReader.java | 2 +
...ControllerPeriodicTaskStarterStatelessTest.java | 2 +-
.../helix/RealtimeConsumerMonitorTest.java | 179 +++++++++++++++++++++
.../apache/pinot/spi/stream/PartitionLagState.java | 2 +-
9 files changed, 353 insertions(+), 3 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 24656a3b70..740ae7c4b9 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -411,6 +411,25 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
}
}
+ /**
+ * Gets the value of a table partition gauge.
+ *
+ * @param tableName The table name
+ * @param partitionId The partition name
+ * @param gauge The gauge to use
+ */
+ public long getValueOfPartitionGauge(final String tableName, final int
partitionId, final G gauge) {
+ final String fullGaugeName;
+ String gaugeName = gauge.getGaugeName();
+ fullGaugeName = gaugeName + "." + getTableName(tableName) + "." +
partitionId;
+
+ if (!_gaugeValues.containsKey(fullGaugeName)) {
+ return -1;
+ } else {
+ return _gaugeValues.get(fullGaugeName).get();
+ }
+ }
+
/**
* Initializes all global meters (such as exceptions count) to zero.
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 4a0e2dfe26..fdf7c918ac 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -117,8 +117,13 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
SEGMENT_DOWNLOADS_IN_PROGRESS("segmentDownloadsInProgress", true),
// Number of in progress segment uploads
- SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true);
+ SEGMENT_UPLOADS_IN_PROGRESS("segmentUploadsInProgress", true),
+ // Records lag at a partition level
+ MAX_RECORDS_LAG("maxRecordsLag", false),
+
+ // Consumption availability lag in ms at a partition level
+ MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 27afdefcc6..b3334617c0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -73,6 +73,7 @@ import
org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
import
org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
+import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.MinionInstancesCleanupTask;
@@ -153,6 +154,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected SegmentRelocator _segmentRelocator;
protected RetentionManager _retentionManager;
protected SegmentStatusChecker _segmentStatusChecker;
+ protected RealtimeConsumerMonitor _realtimeConsumerMonitor;
protected PinotTaskManager _taskManager;
protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo>
_taskManagerStatusCache;
protected PeriodicTaskScheduler _periodicTaskScheduler;
@@ -678,6 +680,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentStatusChecker);
+ _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config,
_helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executorService);
+ periodicTasks.add(_realtimeConsumerMonitor);
_segmentRelocator = new SegmentRelocator(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
_executorService, _connectionManager);
periodicTasks.add(_segmentRelocator);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index f0a4584478..656c4dedde 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -217,6 +217,14 @@ public class ControllerConf extends PinotConfiguration {
private static final int
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final int DEFAULT_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS =
60 * 60;
private static final int
DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS = -1; // Disabled
+
+ // Realtime Consumer Monitor
+ private static final String RT_CONSUMER_MONITOR_FREQUENCY_PERIOD =
+ "controller.realtimeConsumerMonitor.frequencyPeriod";
+ private static final String RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS =
+ "controller.realtimeConsumerMonitor.initialDelayInSeconds";
+
+ private static final int DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_IN_SECONDS
= -1; // Disabled by default
}
private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS =
"server.request.timeoutSeconds";
@@ -588,6 +596,17 @@ public class ControllerConf extends PinotConfiguration {
Integer.toString(statusCheckerFrequencyInSeconds));
}
+ public int getRealtimeConsumerMonitorRunFrequency() {
+ return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD))
+ .map(period -> (int) convertPeriodToSeconds(period)).orElse(
+
ControllerPeriodicTasksConf.DEFAULT_RT_CONSUMER_MONITOR_FREQUENCY_IN_SECONDS);
+ }
+
+ public long getRealtimeConsumerMonitorInitialDelayInSeconds() {
+ return
getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_INITIAL_DELAY_IN_SECONDS,
+ ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+ }
+
public int getTaskMetricsEmitterFrequencyInSeconds() {
return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
new file mode 100644
index 0000000000..eae591f69c
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends
ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+ private static final int DEFAULT_TIMEOUT_MS = 10000;
+ private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+ @VisibleForTesting
+ public RealtimeConsumerMonitor(ControllerConf controllerConf,
PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerMetrics
controllerMetrics,
+ ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+ super("RealtimeConsumerMonitor",
controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+ controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(),
pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
+ _consumingSegmentInfoReader = consumingSegmentInfoReader;
+ }
+
+ public RealtimeConsumerMonitor(ControllerConf controllerConf,
PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerMetrics
controllerMetrics,
+ ExecutorService executorService) {
+ this(controllerConf, pinotHelixResourceManager, leadControllerManager,
controllerMetrics,
+ new ConsumingSegmentInfoReader(executorService, new
SimpleHttpConnectionManager(), pinotHelixResourceManager));
+ }
+
+ @Override
+ protected void setUpTask() {
+ LOGGER.info("Setting up RealtimeConsumerMonitor task");
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+ if
(!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType)))
{
+ return;
+ }
+ try {
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+
_consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType,
DEFAULT_TIMEOUT_MS);
+ Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+ Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+ for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+ : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+ info.forEach(segment -> {
+ segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+ if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+ try {
+ long recordsLag = Long.parseLong(v);
+ partitionToLagSet.computeIfAbsent(k, k1 -> new
ArrayList<>()).add(recordsLag);
+ } catch (NumberFormatException nfe) {
+ // skip this as we are unable to parse the lag string
+ }
+ }
+ });
+ segment._partitionOffsetInfo._availabilityLagMap.forEach((k, v) -> {
+ if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+ try {
+ long availabilityLagMs = Long.parseLong(v);
+ partitionToAvailabilityLagSet.computeIfAbsent(k, k1 -> new
ArrayList<>()).add(availabilityLagMs);
+ } catch (NumberFormatException nfe) {
+ // skip this as we are unable to parse the lag string
+ }
+ }
+ });
+ });
+ }
+ partitionToLagSet.forEach((partition, lagSet) -> {
+ _controllerMetrics.setValueOfPartitionGauge(tableNameWithType,
Integer.parseInt(partition),
+ ControllerGauge.MAX_RECORDS_LAG, Collections.max(lagSet));
+ });
+
+ partitionToAvailabilityLagSet.forEach((partition, lagSet) -> {
+ _controllerMetrics.setValueOfPartitionGauge(tableNameWithType,
Integer.parseInt(partition),
+ ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS,
Collections.max(lagSet));
+ });
+ } catch (Exception e) {
+ LOGGER.error("Failed to fetch consuming segments info. Unable to update
table consumption status metrics");
+ }
+ }
+
+ public static final class Context { }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 6caaea3087..2108eed344 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -227,6 +227,8 @@ public class ConsumingSegmentInfoReader {
}
}
+ // TODO: Invert response to be a map of partition to a vector of
[currentOffset, recordsLag, latestUpstreamOffset,
+ // availabilityLagMs]
@JsonIgnoreProperties(ignoreUnknown = true)
static public class PartitionOffsetInfo {
@JsonProperty("currentOffsetsMap")
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index b82ed7f48e..6172c67def 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest
extends ControllerTest {
}
private class MockControllerStarter extends ControllerStarter {
- private static final int NUM_PERIODIC_TASKS = 9;
+ private static final int NUM_PERIODIC_TASKS = 10;
public MockControllerStarter() {
super();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
new file mode 100644
index 0000000000..1450c02a35
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class RealtimeConsumerMonitorTest {
+
+ @Test
+ public void realtimeBasicTest()
+ throws Exception {
+ final String tableName = "myTable_REALTIME";
+ final String rawTableName =
TableNameBuilder.extractRawTableName(tableName);
+ List<String> allTableNames = new ArrayList<String>();
+ allTableNames.add(tableName);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn").setLLC(true)
+ .setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build();
+ LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1,
0, System.currentTimeMillis());
+ LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1,
1, System.currentTimeMillis());
+ LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2,
0, System.currentTimeMillis());
+ IdealState idealState = new IdealState(tableName);
+ idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(),
"pinot1", "ONLINE");
+ idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(),
"pinot2", "ONLINE");
+ idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(),
"pinot1", "CONSUMING");
+ idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(),
"pinot2", "CONSUMING");
+ idealState.setPartitionState(segmentPartition2Seq0.getSegmentName(),
"pinot1", "CONSUMING");
+ idealState.setPartitionState(segmentPartition2Seq0.getSegmentName(),
"pinot2", "CONSUMING");
+ idealState.setReplicas("3");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+ ExternalView externalView = new ExternalView(tableName);
+ externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1",
"ONLINE");
+ externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2",
"ONLINE");
+ externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1",
"CONSUMING");
+ externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot2",
"CONSUMING");
+ externalView.setState(segmentPartition2Seq0.getSegmentName(), "pinot1",
"CONSUMING");
+ externalView.setState(segmentPartition2Seq0.getSegmentName(), "pinot2",
"CONSUMING");
+
+ PinotHelixResourceManager helixResourceManager;
+ {
+ helixResourceManager = mock(PinotHelixResourceManager.class);
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore =
mock(ZkHelixPropertyStore.class);
+
when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+
when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore);
+ when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
+
when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
+
when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+ ZNRecord znRecord = new ZNRecord("0");
+ znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET,
"10000");
+ when(helixPropertyStore.get(anyString(), any(),
anyInt())).thenReturn(znRecord);
+ }
+ ControllerConf config;
+ {
+ config = mock(ControllerConf.class);
+ when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
+ when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
+ }
+ LeadControllerManager leadControllerManager;
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
+ PinotMetricsRegistry metricsRegistry =
PinotMetricUtils.getPinotMetricsRegistry();
+ ControllerMetrics controllerMetrics = new
ControllerMetrics(metricsRegistry);
+
+ // server 1 caught up on partition-1 and partition-2
+ // server 2 lags for partition-2 and caught up on partition-1
+ // So, the consumer monitor should show: 1. partition-1 has 0 lag;
partition-2 has some non-zero lag.
+ // Segment 1 in replicas:
+ TreeMap<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>>
response = new TreeMap<>();
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>
part1ServerConsumingSegmentInfo = new ArrayList<>(2);
+ part1ServerConsumingSegmentInfo.add(
+ getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"));
+ part1ServerConsumingSegmentInfo.add(
+ getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0"));
+
+ response.put(segmentPartition1Seq1.getSegmentName(),
part1ServerConsumingSegmentInfo);
+
+ // Segment 2 in replicas
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>
part2ServerConsumingSegmentInfo = new ArrayList<>(2);
+ part2ServerConsumingSegmentInfo.add(
+ getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"));
+ part2ServerConsumingSegmentInfo.add(
+ getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000"));
+
+ response.put(segmentPartition2Seq0.getSegmentName(),
part2ServerConsumingSegmentInfo);
+
+ ConsumingSegmentInfoReader consumingSegmentReader =
mock(ConsumingSegmentInfoReader.class);
+ when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
+ .thenReturn(new
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response));
+ RealtimeConsumerMonitor realtimeConsumerMonitor =
+ new RealtimeConsumerMonitor(config, helixResourceManager,
leadControllerManager,
+ controllerMetrics, consumingSegmentReader);
+ realtimeConsumerMonitor.start();
+ realtimeConsumerMonitor.run();
+ Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName,
1,
+ ControllerGauge.MAX_RECORDS_LAG), 0);
+ Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName,
2,
+ ControllerGauge.MAX_RECORDS_LAG), 40);
+ Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName,
1,
+ ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0);
+ Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName,
2,
+ ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000);
+ }
+
+ ConsumingSegmentInfoReader.ConsumingSegmentInfo
getConsumingSegmentInfoForServer(String serverName,
+ String partitionId, String currentOffset, String upstreamLatestOffset,
String availabilityLagMs) {
+ Map<String, String> currentOffsetMap =
Collections.singletonMap(partitionId, currentOffset);
+ Map<String, String> latestUpstreamOffsetMap =
Collections.singletonMap(partitionId, upstreamLatestOffset);
+ Map<String, String> recordsLagMap = Collections.singletonMap(partitionId,
String.valueOf(
+ Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset)));
+ Map<String, String> availabilityLagMsMap =
Collections.singletonMap(partitionId, availabilityLagMs);
+
+ ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo =
+ new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap,
latestUpstreamOffsetMap, recordsLagMap,
+ availabilityLagMsMap);
+ return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName,
"CONSUMING", -1,
+ currentOffsetMap, partitionOffsetInfo);
+ }
+
+ Map<String, String> getStreamConfigMap() {
+ return ImmutableMap.of(
+ "streamType", "kafka",
+ "stream.kafka.consumer.type", "simple",
+ "stream.kafka.topic.name", "test",
+ "stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
+ "stream.kafka.consumer.factory.class.name",
+
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
index ead8246baf..224775fbc0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
@@ -24,7 +24,7 @@ package org.apache.pinot.spi.stream;
* record offset, ingestion time etc.
*/
public class PartitionLagState {
- protected final static String NOT_CALCULATED = "NOT_CALCULATED";
+ public final static String NOT_CALCULATED = "NOT_CALCULATED";
/**
* Defines how far behind the current record's offset / pointer is from
upstream latest record
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]