This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f9074f9a91 Enhance SegmentStatusChecker to honor CONSUMING segment 
(#13562)
f9074f9a91 is described below

commit f9074f9a914b61621ddba497dbe259da40bdb07d
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jul 10 11:56:26 2024 -0700

    Enhance SegmentStatusChecker to honor CONSUMING segment (#13562)
---
 .../controller/helix/SegmentStatusChecker.java     | 285 ++++---
 .../controller/helix/SegmentStatusCheckerTest.java | 929 +++++++--------------
 2 files changed, 472 insertions(+), 742 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 6488ecb022..ceb33402e8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.controller.helix;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -51,6 +52,8 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -63,22 +66,18 @@ import org.slf4j.LoggerFactory;
  */
 public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusChecker.Context> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentStatusChecker.class);
-  private static final int MAX_OFFLINE_SEGMENTS_TO_LOG = 5;
-  public static final String ONLINE = "ONLINE";
-  public static final String ERROR = "ERROR";
-  public static final String CONSUMING = "CONSUMING";
-
-  // log messages about disabled tables atmost once a day
-  private static final long DISABLED_TABLE_LOG_INTERVAL_MS = 
TimeUnit.DAYS.toMillis(1);
   private static final ZNRecordSerializer RECORD_SERIALIZER = new 
ZNRecordSerializer();
-
   private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000;
 
+  // log messages about disabled tables at most once a day
+  private static final long DISABLED_TABLE_LOG_INTERVAL_MS = 
TimeUnit.DAYS.toMillis(1);
+  private static final int MAX_SEGMENTS_TO_LOG = 10;
+
   private final int _waitForPushTimeSeconds;
-  private long _lastDisabledTableLogTimestamp = 0;
+  private final TableSizeReader _tableSizeReader;
+  private final Set<String> _tierBackendGauges = new HashSet<>();
 
-  private TableSizeReader _tableSizeReader;
-  private Set<String> _tierBackendGauges = new HashSet<>();
+  private long _lastDisabledTableLogTimestamp = 0;
 
   /**
    * Constructs the segment status checker.
@@ -190,7 +189,7 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
         }
       }
       tierBackendSet.forEach(tierBackend -> 
context._tierBackendTableCountMap.put(tierBackend,
-              context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 
1));
+          context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 1));
       context._tierBackendConfiguredTableCount += tierBackendSet.isEmpty() ? 0 
: 1;
     }
     int replication = tableConfig.getReplication();
@@ -226,145 +225,173 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
       return;
     }
 
-    //check if table consumption is paused
-    boolean isTablePaused =
-        
Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED));
-
-    if (isTablePaused) {
+    if 
(Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED)))
 {
       context._pausedTables.add(tableNameWithType);
     }
 
-    if (idealState.getPartitionSet().isEmpty()) {
-      int nReplicasFromIdealState = 1;
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+        idealState.toString().length());
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
+        idealState.serialize(RECORD_SERIALIZER).length);
+
+    Set<String> segmentsIncludingReplaced = idealState.getPartitionSet();
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
+        segmentsIncludingReplaced.size());
+    // Get the segments excluding the replaced segments which are specified in 
the segment lineage entries and cannot
+    // be queried from the table.
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+    Set<String> segments;
+    if (segmentsIncludingReplaced.isEmpty()) {
+      segments = Set.of();
+    } else {
+      segments = new HashSet<>(segmentsIncludingReplaced);
+      SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
+      SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segments, 
segmentLineage);
+    }
+    int numSegments = segments.size();
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT, numSegments);
+    if (numSegments == 0) {
+      int numReplicasFromIS;
       try {
-        nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
+        numReplicasFromIS = 
Math.max(Integer.parseInt(idealState.getReplicas()), 1);
       } catch (NumberFormatException e) {
-        // Ignore
+        numReplicasFromIS = 1;
       }
-      _controllerMetrics
-          .setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, numReplicasFromIS);
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS, 100);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, 0);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.TABLE_COMPRESSED_SIZE, 0);
       return;
     }
 
-    // Get the segments excluding the replaced segments which are specified in 
the segment lineage entries and cannot
-    // be queried from the table.
-    Set<String> segmentsExcludeReplaced = new 
HashSet<>(idealState.getPartitionSet());
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
-    SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
-    
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced,
 segmentLineage);
-    _controllerMetrics
-        .setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
-    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
-        idealState.serialize(RECORD_SERIALIZER).length);
-    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT,
-        (long) segmentsExcludeReplaced.size());
-    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
-        (long) (idealState.getPartitionSet().size()));
     ExternalView externalView = 
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
 
-    int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in 
ideal state
-    int nReplicasExternal = -1; // Keeps track of minimum number of replicas 
in external view
-    int nErrors = 0; // Keeps track of number of segments in error state
-    int nOffline = 0; // Keeps track of number segments with no online replicas
-    int nNumOfReplicasLessThanIdeal = 0; // Keeps track of number of segments 
running with less than expected replicas
-    int nSegments = 0; // Counts number of segments
-    long tableCompressedSize = 0; // Tracks the total compressed segment size 
in deep store per table
-    for (String partitionName : segmentsExcludeReplaced) {
-      int nReplicas = 0;
-      int nIdeal = 0;
-      nSegments++;
-      // Skip segments not online in ideal state
-      for (Map.Entry<String, String> serverAndState : 
idealState.getInstanceStateMap(partitionName).entrySet()) {
-        if (serverAndState == null) {
-          break;
-        }
-        if (serverAndState.getValue().equals(ONLINE)) {
-          nIdeal++;
-          break;
+    // Maximum number of replicas in ideal state
+    int maxISReplicas = Integer.MIN_VALUE;
+    // Minimum number of replicas in external view
+    int minEVReplicas = Integer.MAX_VALUE;
+    // Total compressed segment size in deep store
+    long tableCompressedSize = 0;
+    // Segments without ZK metadata
+    List<String> segmentsWithoutZKMetadata = new ArrayList<>();
+    // Pairs of segment-instance in ERROR state
+    List<Pair<String, String>> errorSegments = new ArrayList<>();
+    // Offline segments
+    List<String> offlineSegments = new ArrayList<>();
+    // Segments with fewer replicas online (ONLINE/CONSUMING) in external view 
than in ideal state
+    List<String> partialOnlineSegments = new ArrayList<>();
+    for (String segment : segments) {
+      int numISReplicas = 0;
+      for (Map.Entry<String, String> entry : 
idealState.getInstanceStateMap(segment).entrySet()) {
+        String state = entry.getValue();
+        if (state.equals(SegmentStateModel.ONLINE) || 
state.equals(SegmentStateModel.CONSUMING)) {
+          numISReplicas++;
         }
       }
-      if (nIdeal == 0) {
-        // No online segments in ideal state
+      // Skip segments not ONLINE/CONSUMING in ideal state
+      if (numISReplicas == 0) {
         continue;
       }
-      SegmentZKMetadata segmentZKMetadata =
-          _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, 
partitionName);
-      if (segmentZKMetadata != null
-          && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - 
_waitForPushTimeSeconds * 1000) {
-        // Push is not finished yet, skip the segment
+      maxISReplicas = Math.max(maxISReplicas, numISReplicas);
+
+      SegmentZKMetadata segmentZKMetadata = 
_pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
+      // Skip the segment when it doesn't have ZK metadata. Most likely the 
segment is just deleted.
+      if (segmentZKMetadata == null) {
+        segmentsWithoutZKMetadata.add(segment);
         continue;
       }
-      if (segmentZKMetadata != null) {
-        long sizeInBytes = segmentZKMetadata.getSizeInBytes();
-        if (sizeInBytes > 0) {
-          tableCompressedSize += sizeInBytes;
-        }
+      long sizeInBytes = segmentZKMetadata.getSizeInBytes();
+      if (sizeInBytes > 0) {
+        tableCompressedSize += sizeInBytes;
       }
-      nReplicasIdealMax = 
(idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? 
idealState
-          .getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
-      if ((externalView == null) || (externalView.getStateMap(partitionName) 
== null)) {
-        // No replicas for this segment
-        nOffline++;
-        if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) {
-          LOGGER.warn("Segment {} of table {} has no replicas", partitionName, 
tableNameWithType);
-        }
-        nReplicasExternal = 0;
+
+      // NOTE: We want to skip segments that are just created/pushed to avoid 
false alerts because it is expected for
+      //       servers to take some time to load them. For consuming 
(IN_PROGRESS) segments, we use creation time from
+      //       the ZK metadata; for pushed segments, we use push time from the 
ZK metadata. Both of them are the time
+      //       when segment is newly created. For committed segments from 
real-time table, push time doesn't exist, and
+      //       creationTimeMs will be Long.MIN_VALUE, which is fine because we 
want to include them in the check.
+      long creationTimeMs = segmentZKMetadata.getStatus() == 
Status.IN_PROGRESS ? segmentZKMetadata.getCreationTime()
+          : segmentZKMetadata.getPushTime();
+      if (creationTimeMs > System.currentTimeMillis() - 
_waitForPushTimeSeconds * 1000L) {
         continue;
       }
-      for (Map.Entry<String, String> serverAndState : 
externalView.getStateMap(partitionName).entrySet()) {
-        // Count number of online replicas. Ignore if state is CONSUMING.
-        // It is possible for a segment to be ONLINE in idealstate, and 
CONSUMING in EV for a short period of time.
-        // So, ignore this combination. If a segment exists in this 
combination for a long time, we will get
-        // low level-partition-not-consuming alert anyway.
-        if (serverAndState.getValue().equals(ONLINE) || 
serverAndState.getValue().equals(CONSUMING)) {
-          nReplicas++;
-        }
-        if (serverAndState.getValue().equals(ERROR)) {
-          nErrors++;
+
+      int numEVReplicas = 0;
+      if (externalView != null) {
+        Map<String, String> stateMap = externalView.getStateMap(segment);
+        if (stateMap != null) {
+          for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+            String state = entry.getValue();
+            if (state.equals(SegmentStateModel.ONLINE) || 
state.equals(SegmentStateModel.CONSUMING)) {
+              numEVReplicas++;
+            }
+            if (state.equals(SegmentStateModel.ERROR)) {
+              errorSegments.add(Pair.of(segment, entry.getKey()));
+            }
+          }
         }
       }
-      if (nReplicas == 0) {
-        if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) {
-          LOGGER.warn("Segment {} of table {} has no online replicas", 
partitionName, tableNameWithType);
-        }
-        nOffline++;
-      } else if (nReplicas < nReplicasIdealMax) {
-        LOGGER.debug("Segment {} of table {} is running with {} replicas which 
is less than the expected values {}",
-            partitionName, tableNameWithType, nReplicas, nReplicasIdealMax);
-        nNumOfReplicasLessThanIdeal++;
+      if (numEVReplicas == 0) {
+        offlineSegments.add(segment);
+      } else if (numEVReplicas < numISReplicas) {
+        partialOnlineSegments.add(segment);
+      } else {
+        // Do not allow nReplicasEV to be larger than nReplicasIS
+        numEVReplicas = numISReplicas;
+      }
+      minEVReplicas = Math.min(minEVReplicas, numEVReplicas);
+    }
+
+    if (maxISReplicas == Integer.MIN_VALUE) {
+      try {
+        maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 
1);
+      } catch (NumberFormatException e) {
+        maxISReplicas = 1;
       }
-      nReplicasExternal =
-          ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? 
nReplicas : nReplicasExternal;
     }
-    if (nReplicasExternal == -1) {
-      nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+    // Do not allow minEVReplicas to be larger than maxISReplicas
+    minEVReplicas = Math.min(minEVReplicas, maxISReplicas);
+
+    if (minEVReplicas < maxISReplicas) {
+      LOGGER.warn("Table {} has at least one segment running with only {} 
replicas, below replication threshold :{}",
+          tableNameWithType, minEVReplicas, maxISReplicas);
+    }
+    int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
+    if (numSegmentsWithoutZKMetadata > 0) {
+      LOGGER.warn("Table {} has {} segments without ZK metadata: {}", 
tableNameWithType, numSegmentsWithoutZKMetadata,
+          logSegments(segmentsWithoutZKMetadata));
+    }
+    int numErrorSegments = errorSegments.size();
+    if (numErrorSegments > 0) {
+      LOGGER.warn("Table {} has {} segments in ERROR state: {}", 
tableNameWithType, numErrorSegments,
+          logSegments(errorSegments));
+    }
+    int numOfflineSegments = offlineSegments.size();
+    if (numOfflineSegments > 0) {
+      LOGGER.warn("Table {} has {} segments without ONLINE/CONSUMING replica: 
{}", tableNameWithType,
+          numOfflineSegments, logSegments(offlineSegments));
+    }
+    int numPartialOnlineSegments = partialOnlineSegments.size();
+    if (numPartialOnlineSegments > 0) {
+      LOGGER.warn("Table {} has {} segments with fewer replicas than the 
replication factor: {}", tableNameWithType,
+          numPartialOnlineSegments, logSegments(partialOnlineSegments));
     }
+
     // Synchronization provided by Controller Gauge to make sure that only one 
thread updates the gauge
-    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS,
-        (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / 
nReplicasIdealMax) : 100);
-    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
-    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
-        nNumOfReplicasLessThanIdeal);
+        minEVReplicas * 100L / maxISReplicas);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE,
+        numErrorSegments);
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
-        (nSegments > 0) ? (nSegments - nOffline) * 100 / nSegments : 100);
+        numOfflineSegments > 0 ? (numSegments - numOfflineSegments) * 100L / 
numSegments : 100);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
+        numPartialOnlineSegments);
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.TABLE_COMPRESSED_SIZE,
         tableCompressedSize);
 
-    if (nOffline > 0) {
-      LOGGER.warn("Table {} has {} segments with no online replicas", 
tableNameWithType, nOffline);
-    }
-    if (nNumOfReplicasLessThanIdeal > 0) {
-      LOGGER.warn("Table {} has {} segments with number of replicas less than 
the replication factor",
-          tableNameWithType, nNumOfReplicasLessThanIdeal);
-    }
-    if (nReplicasExternal < nReplicasIdealMax) {
-      LOGGER.warn("Table {} has at least one segment running with only {} 
replicas, below replication threshold :{}",
-          tableNameWithType, nReplicasExternal, nReplicasIdealMax);
-    }
-
     if (tableType == TableType.REALTIME && tableConfig != null) {
       StreamConfig streamConfig =
           new StreamConfig(tableConfig.getTableName(), 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
@@ -373,6 +400,13 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     }
   }
 
+  private static String logSegments(List<?> segments) {
+    if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
+      return segments.toString();
+    }
+    return segments.subList(0, MAX_SEGMENTS_TO_LOG) + "...";
+  }
+
   @Override
   protected void nonLeaderCleanup(List<String> tableNamesWithType) {
     tableNamesWithType.forEach(this::removeMetricsForTable);
@@ -403,20 +437,15 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
   public void cleanUpTask() {
   }
 
-  @VisibleForTesting
-  void setTableSizeReader(TableSizeReader tableSizeReader) {
-    _tableSizeReader = tableSizeReader;
-  }
-
   public static final class Context {
     private boolean _logDisabledTables;
     private int _realTimeTableCount;
     private int _offlineTableCount;
     private int _upsertTableCount;
     private int _tierBackendConfiguredTableCount;
-    private Map<String, Integer> _tierBackendTableCountMap = new HashMap<>();
-    private Set<String> _processedTables = new HashSet<>();
-    private Set<String> _disabledTables = new HashSet<>();
-    private Set<String> _pausedTables = new HashSet<>();
+    private final Map<String, Integer> _tierBackendTableCountMap = new 
HashMap<>();
+    private final Set<String> _processedTables = new HashSet<>();
+    private final Set<String> _disabledTables = new HashSet<>();
+    private final Set<String> _pausedTables = new HashSet<>();
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 429391e8e3..81a0f345c1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -42,8 +41,8 @@ import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.Test;
@@ -58,25 +57,23 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 
 
+@SuppressWarnings("unchecked")
 public class SegmentStatusCheckerTest {
+  private static final String RAW_TABLE_NAME = "myTable";
+  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
-  private SegmentStatusChecker _segmentStatusChecker;
-  private PinotHelixResourceManager _helixResourceManager;
-  private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
-  private LeadControllerManager _leadControllerManager;
-  private PinotMetricsRegistry _metricsRegistry;
-  private ControllerMetrics _controllerMetrics;
-  private ControllerConf _config;
-  private TableSizeReader _tableSizeReader;
+  // Intentionally not reset the metrics to test all metrics being refreshed.
+  private final ControllerMetrics _controllerMetrics =
+      new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
 
   @Test
-  public void offlineBasicTest()
-      throws Exception {
-    String offlineTableName = "myTable_OFFLINE";
+  public void offlineBasicTest() {
+    // Intentionally set the replication number to 2 to test the metrics.
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(2).build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(2).build();
 
-    IdealState idealState = new IdealState(offlineTableName);
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
@@ -90,10 +87,10 @@ public class SegmentStatusCheckerTest {
     idealState.setPartitionState("myTable_4", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_4", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_4", "pinot3", "ONLINE");
-    idealState.setReplicas("2");
+    idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(offlineTableName);
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
     externalView.setState("myTable_0", "pinot1", "ONLINE");
     externalView.setState("myTable_0", "pinot2", "ONLINE");
     externalView.setState("myTable_1", "pinot1", "ERROR");
@@ -104,170 +101,161 @@ public class SegmentStatusCheckerTest {
     externalView.setState("myTable_3", "pinot3", "ONLINE");
     externalView.setState("myTable_4", "pinot1", "ONLINE");
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-      
when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig);
-      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
-    }
-    {
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, 
{myTable_3 -> myTable_4, IN_PROGRESS},
-      // myTable_1 and myTable_4 will be skipped for the metrics.
-      SegmentLineage segmentLineage = new SegmentLineage(offlineTableName);
-      
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
-          new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), 
LineageEntryState.COMPLETED, 11111L));
-      
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
-          new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), 
LineageEntryState.IN_PROGRESS, 11111L));
-      when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), 
any(),
-          
eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord());
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
offlineTableName,
-        ControllerGauge.REPLICATION_FROM_CONFIG), 2);
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+    
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+    SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 
11111L);
+    when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), 
anyString())).thenReturn(segmentZKMetadata);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+    // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, 
{myTable_3 -> myTable_4, IN_PROGRESS},
+    // myTable_1 and myTable_4 will be skipped for the metrics.
+    SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
+    
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
+        new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), 
LineageEntryState.COMPLETED, 11111L));
+    
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
+        new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), 
LineageEntryState.IN_PROGRESS, 11111L));
+    when(
+        propertyStore.get(eq("/SEGMENT_LINEAGE/" + OFFLINE_TABLE_NAME), any(), 
eq(AccessOption.PERSISTENT))).thenReturn(
+        segmentLineage.toZNRecord());
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 2, 5, 3, 2, 66, 1, 100, 2, 
2468);
+  }
+
+  private SegmentZKMetadata mockPushedSegmentZKMetadata(long sizeInBytes, long 
pushTimeMs) {
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata.getStatus()).thenReturn(Status.UPLOADED);
+    when(segmentZKMetadata.getSizeInBytes()).thenReturn(sizeInBytes);
+    when(segmentZKMetadata.getPushTime()).thenReturn(pushTimeMs);
+    return segmentZKMetadata;
+  }
+
+  private void runSegmentStatusChecker(PinotHelixResourceManager 
resourceManager, int waitForPushTimeInSeconds) {
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    ControllerConf controllerConf = mock(ControllerConf.class);
+    
when(controllerConf.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(waitForPushTimeInSeconds);
+    TableSizeReader tableSizeReader = mock(TableSizeReader.class);
+    SegmentStatusChecker segmentStatusChecker =
+        new SegmentStatusChecker(resourceManager, leadControllerManager, 
controllerConf, _controllerMetrics,
+            tableSizeReader);
+    segmentStatusChecker.start();
+    segmentStatusChecker.run();
+  }
+
+  private void verifyControllerMetrics(String tableNameWithType, int 
expectedReplicationFromConfig,
+      int expectedNumSegmentsIncludingReplaced, int expectedNumSegment, int 
expectedNumReplicas,
+      int expectedPercentOfReplicas, int expectedSegmentsInErrorState, int 
expectedPercentSegmentsAvailable,
+      int expectedSegmentsWithLessReplicas, int expectedTableCompressedSize) {
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType,
+        ControllerGauge.REPLICATION_FROM_CONFIG), 
expectedReplicationFromConfig);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType,
+        ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 
expectedNumSegmentsIncludingReplaced);
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType, ControllerGauge.SEGMENT_COUNT),
+        expectedNumSegment);
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS),
+        expectedNumReplicas);
     assertEquals(
-        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(), ControllerGauge.SEGMENT_COUNT),
-        3);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.NUMBER_OF_REPLICAS), 2);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_OF_REPLICAS), 66);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS),
+        expectedPercentOfReplicas);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType,
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 
expectedSegmentsInErrorState);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType,
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 
expectedPercentSegmentsAvailable);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType,
+        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 
expectedSegmentsWithLessReplicas);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableNameWithType,
+        ControllerGauge.TABLE_COMPRESSED_SIZE), expectedTableCompressedSize);
   }
 
   @Test
-  public void realtimeBasicTest()
-      throws Exception {
-    String rawTableName = "myTable";
-    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+  public void realtimeBasicTest() {
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn")
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn")
             .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build();
 
-    LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, 
System.currentTimeMillis());
-    LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, 
System.currentTimeMillis());
-    LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, 
System.currentTimeMillis());
-    IdealState idealState = new IdealState(realtimeTableName);
-    idealState.setPartitionState(seg1.getSegmentName(), "pinot1", "ONLINE");
-    idealState.setPartitionState(seg1.getSegmentName(), "pinot2", "ONLINE");
-    idealState.setPartitionState(seg1.getSegmentName(), "pinot3", "ONLINE");
-    idealState.setPartitionState(seg2.getSegmentName(), "pinot1", "ONLINE");
-    idealState.setPartitionState(seg2.getSegmentName(), "pinot2", "ONLINE");
-    idealState.setPartitionState(seg2.getSegmentName(), "pinot3", "ONLINE");
-    idealState.setPartitionState(seg3.getSegmentName(), "pinot1", "CONSUMING");
-    idealState.setPartitionState(seg3.getSegmentName(), "pinot2", "CONSUMING");
-    idealState.setPartitionState(seg3.getSegmentName(), "pinot3", "OFFLINE");
+    String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, 
System.currentTimeMillis()).getSegmentName();
+    String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, 
System.currentTimeMillis()).getSegmentName();
+    String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, 
System.currentTimeMillis()).getSegmentName();
+    IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+    idealState.setPartitionState(seg1, "pinot1", "ONLINE");
+    idealState.setPartitionState(seg1, "pinot2", "ONLINE");
+    idealState.setPartitionState(seg1, "pinot3", "ONLINE");
+    idealState.setPartitionState(seg2, "pinot1", "ONLINE");
+    idealState.setPartitionState(seg2, "pinot2", "ONLINE");
+    idealState.setPartitionState(seg2, "pinot3", "ONLINE");
+    idealState.setPartitionState(seg3, "pinot1", "CONSUMING");
+    idealState.setPartitionState(seg3, "pinot2", "CONSUMING");
+    idealState.setPartitionState(seg3, "pinot3", "OFFLINE");
     idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(realtimeTableName);
-    externalView.setState(seg1.getSegmentName(), "pinot1", "ONLINE");
-    externalView.setState(seg1.getSegmentName(), "pinot2", "ONLINE");
-    externalView.setState(seg1.getSegmentName(), "pinot3", "ONLINE");
-    externalView.setState(seg2.getSegmentName(), "pinot1", "CONSUMING");
-    externalView.setState(seg2.getSegmentName(), "pinot2", "ONLINE");
-    externalView.setState(seg2.getSegmentName(), "pinot3", "CONSUMING");
-    externalView.setState(seg3.getSegmentName(), "pinot1", "CONSUMING");
-    externalView.setState(seg3.getSegmentName(), "pinot2", "CONSUMING");
-    externalView.setState(seg3.getSegmentName(), "pinot3", "OFFLINE");
-
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView);
-      ZNRecord znRecord = new ZNRecord("0");
-      znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, 
"10000");
-      when(_helixPropertyStore.get(anyString(), any(), 
anyInt())).thenReturn(znRecord);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.REPLICATION_FROM_CONFIG), 3);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.NUMBER_OF_REPLICAS), 3);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+    ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+    externalView.setState(seg1, "pinot1", "ONLINE");
+    externalView.setState(seg1, "pinot2", "ONLINE");
+    externalView.setState(seg1, "pinot3", "ONLINE");
+    externalView.setState(seg2, "pinot1", "CONSUMING");
+    externalView.setState(seg2, "pinot2", "ONLINE");
+    externalView.setState(seg2, "pinot3", "CONSUMING");
+    externalView.setState(seg3, "pinot1", "CONSUMING");
+    externalView.setState(seg3, "pinot2", "CONSUMING");
+    externalView.setState(seg3, "pinot3", "OFFLINE");
+
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
+    
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView);
+    SegmentZKMetadata committedSegmentZKMetadata = 
mockCommittedSegmentZKMetadata();
+    when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
seg1)).thenReturn(committedSegmentZKMetadata);
+    when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
seg2)).thenReturn(committedSegmentZKMetadata);
+    SegmentZKMetadata consumingSegmentZKMetadata = 
mockConsumingSegmentZKMetadata(11111L);
+    when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
seg3)).thenReturn(consumingSegmentZKMetadata);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+    ZNRecord znRecord = new ZNRecord("0");
+    znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, 
"10000");
+    when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 0, 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
REALTIME_TABLE_NAME,
         ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
   }
 
-  Map<String, String> getStreamConfigMap() {
+  private Map<String, String> getStreamConfigMap() {
     return Map.of("streamType", "kafka", "stream.kafka.consumer.type", 
"simple", "stream.kafka.topic.name", "test",
         "stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
         "stream.kafka.consumer.factory.class.name",
         
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
   }
 
-  @Test
-  public void missingEVPartitionTest()
-      throws Exception {
-    String offlineTableName = "myTable_OFFLINE";
+  private SegmentZKMetadata mockCommittedSegmentZKMetadata() {
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata.getStatus()).thenReturn(Status.DONE);
+    when(segmentZKMetadata.getSizeInBytes()).thenReturn(-1L);
+    when(segmentZKMetadata.getPushTime()).thenReturn(Long.MIN_VALUE);
+    return segmentZKMetadata;
+  }
+
+  private SegmentZKMetadata mockConsumingSegmentZKMetadata(long 
creationTimeMs) {
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata.getStatus()).thenReturn(Status.IN_PROGRESS);
+    when(segmentZKMetadata.getSizeInBytes()).thenReturn(-1L);
+    when(segmentZKMetadata.getCreationTime()).thenReturn(creationTimeMs);
+    return segmentZKMetadata;
+  }
 
-    IdealState idealState = new IdealState(offlineTableName);
+  @Test
+  public void missingEVPartitionTest() {
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
@@ -276,191 +264,89 @@ public class SegmentStatusCheckerTest {
     idealState.setPartitionState("myTable_1", "pinot3", "ONLINE");
     idealState.setPartitionState("myTable_2", "pinot3", "OFFLINE");
     idealState.setPartitionState("myTable_3", "pinot3", "ONLINE");
-    idealState.setReplicas("2");
+    idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(offlineTableName);
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
     externalView.setState("myTable_0", "pinot1", "ONLINE");
     externalView.setState("myTable_0", "pinot2", "ONLINE");
     externalView.setState("myTable_1", "pinot1", "ERROR");
     externalView.setState("myTable_1", "pinot2", "ONLINE");
 
-    ZNRecord znrecord = new ZNRecord("myTable_0");
-    znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
-    znrecord.setLongField(CommonConstants.Segment.START_TIME, 1000);
-    znrecord.setLongField(CommonConstants.Segment.END_TIME, 2000);
-    znrecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, 
TimeUnit.HOURS.toString());
-    znrecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
-    znrecord.setLongField(CommonConstants.Segment.CRC, 1234);
-    znrecord.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
-    znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, 
"http://localhost:8000/myTable_0";);
-    znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, 
System.currentTimeMillis());
-    znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, 
System.currentTimeMillis());
-    znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
-
-    ZkHelixPropertyStore<ZNRecord> propertyStore;
-    {
-      propertyStore = (ZkHelixPropertyStore<ZNRecord>) 
mock(ZkHelixPropertyStore.class);
-      when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, 
AccessOption.PERSISTENT)).thenReturn(
-          znrecord);
-    }
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+    
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+    SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 
11111L);
+    when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), 
anyString())).thenReturn(segmentZKMetadata);
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
-      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_3")).thenReturn(
-          new SegmentZKMetadata(znrecord));
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.NUMBER_OF_REPLICAS), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.TABLE_COMPRESSED_SIZE), 1111);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 4, 4, 0, 0, 1, 75, 2, 3702);
   }
 
   @Test
-  public void missingEVTest()
-      throws Exception {
-    String realtimeTableName = "myTable_REALTIME";
-
-    IdealState idealState = new IdealState(realtimeTableName);
+  public void missingEVTest() {
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
     idealState.setPartitionState("myTable_1", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_1", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_1", "pinot3", "ONLINE");
-    idealState.setPartitionState("myTable_2", "pinot3", "OFFLINE");
-    idealState.setReplicas("2");
+    idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    assertEquals(
-        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
-        0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+    SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 
11111L);
+    when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), 
anyString())).thenReturn(segmentZKMetadata);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 2, 2, 0, 0, 0, 0, 0, 2468);
   }
 
   @Test
-  public void missingIdealTest()
-      throws Exception {
-    String realtimeTableName = "myTable_REALTIME";
-
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(null);
-      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE));
-    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS));
+  public void missingIdealTest() {
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetricsNotExist();
+  }
+
+  private void verifyControllerMetricsNotExist() {
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.REPLICATION_FROM_CONFIG), 0);
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED));
+    assertFalse(
+        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME, ControllerGauge.SEGMENT_COUNT));
     assertFalse(
-        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS));
+        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME, ControllerGauge.NUMBER_OF_REPLICAS));
     assertFalse(
-        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS));
-    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
+        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME, ControllerGauge.PERCENT_OF_REPLICAS));
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE));
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE));
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS));
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
OFFLINE_TABLE_NAME,
         ControllerGauge.TABLE_COMPRESSED_SIZE));
   }
 
   @Test
-  public void missingEVPartitionPushTest()
-      throws Exception {
-    String offlineTableName = "myTable_OFFLINE";
-
-    IdealState idealState = new IdealState(offlineTableName);
+  public void missingEVPartitionPushTest() {
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
+    idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_1", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_1", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_2", "pinot1", "ONLINE");
@@ -468,246 +354,144 @@ public class SegmentStatusCheckerTest {
     idealState.setReplicas("2");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(offlineTableName);
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    externalView.setState("myTable_0", "pinot1", "ONLINE");
+    externalView.setState("myTable_0", "pinot2", "ONLINE");
     externalView.setState("myTable_1", "pinot1", "ONLINE");
     externalView.setState("myTable_1", "pinot2", "ONLINE");
     // myTable_2 is push in-progress and only one replica has been downloaded 
by servers. It will be skipped for
     // the segment status check.
     externalView.setState("myTable_2", "pinot1", "ONLINE");
 
-    ZNRecord znrecord = new ZNRecord("myTable_0");
-    znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
-    znrecord.setLongField(CommonConstants.Segment.START_TIME, 1000);
-    znrecord.setLongField(CommonConstants.Segment.END_TIME, 2000);
-    znrecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, 
TimeUnit.HOURS.toString());
-    znrecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
-    znrecord.setLongField(CommonConstants.Segment.CRC, 1234);
-    znrecord.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
-    znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, 
"http://localhost:8000/myTable_0";);
-    znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, 
System.currentTimeMillis());
-    znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, 
System.currentTimeMillis());
-    znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
-
-    ZNRecord znrecord2 = new ZNRecord("myTable_2");
-    znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1");
-    znrecord2.setLongField(CommonConstants.Segment.START_TIME, 1000);
-    znrecord2.setLongField(CommonConstants.Segment.END_TIME, 2000);
-    znrecord2.setSimpleField(CommonConstants.Segment.TIME_UNIT, 
TimeUnit.HOURS.toString());
-    znrecord2.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000);
-    znrecord2.setLongField(CommonConstants.Segment.CRC, 1235);
-    znrecord2.setLongField(CommonConstants.Segment.CREATION_TIME, 3000);
-    znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, 
"http://localhost:8000/myTable_2";);
-    znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME, 
System.currentTimeMillis());
-    znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME, 
System.currentTimeMillis());
-    znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111);
-
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
-      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_0")).thenReturn(
-          new SegmentZKMetadata(znrecord));
-      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_2")).thenReturn(
-          new SegmentZKMetadata(znrecord2));
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.NUMBER_OF_REPLICAS), 2);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+    
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+    SegmentZKMetadata segmentZKMetadata01 = mockPushedSegmentZKMetadata(1234, 
11111L);
+    when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, 
"myTable_0")).thenReturn(segmentZKMetadata01);
+    when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, 
"myTable_1")).thenReturn(segmentZKMetadata01);
+    SegmentZKMetadata segmentZKMetadata2 = mockPushedSegmentZKMetadata(1234, 
System.currentTimeMillis());
+    when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, 
"myTable_2")).thenReturn(segmentZKMetadata2);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    runSegmentStatusChecker(resourceManager, 600);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 3, 3, 2, 100, 0, 100, 0, 
3702);
   }
 
   @Test
-  public void noReplicas()
-      throws Exception {
-    String realtimeTableName = "myTable_REALTIME";
+  public void missingEVUploadedConsumingTest() {
+    IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+    idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
+    idealState.setPartitionState("myTable_1", "pinot2", "CONSUMING");
+    idealState.setReplicas("1");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
+    SegmentZKMetadata updatedSegmentZKMetadata = 
mockPushedSegmentZKMetadata(1234, System.currentTimeMillis());
+    when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
"myTable_0")).thenReturn(updatedSegmentZKMetadata);
+    SegmentZKMetadata consumingSegmentZKMetadata = 
mockConsumingSegmentZKMetadata(System.currentTimeMillis());
+    when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
"myTable_1")).thenReturn(consumingSegmentZKMetadata);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
 
-    IdealState idealState = new IdealState(realtimeTableName);
+    runSegmentStatusChecker(resourceManager, 600);
+    verifyControllerMetrics(REALTIME_TABLE_NAME, 0, 2, 2, 1, 100, 0, 100, 0, 
1234);
+  }
+
+  @Test
+  public void noReplicaTest() {
+    IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
     idealState.setPartitionState("myTable_0", "pinot1", "OFFLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "OFFLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "OFFLINE");
     idealState.setReplicas("0");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    assertEquals(
-        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
-        1);
-    assertEquals(
-        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS),
-        100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState);
+    
when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(null);
+    SegmentZKMetadata segmentZKMetadata = 
mockConsumingSegmentZKMetadata(11111L);
+    when(resourceManager.getSegmentZKMetadata(eq(REALTIME_TABLE_NAME), 
anyString())).thenReturn(segmentZKMetadata);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(REALTIME_TABLE_NAME, 0, 1, 1, 1, 100, 0, 100, 0, 
0);
   }
 
   @Test
-  public void disabledTableTest() {
-    String offlineTableName = "myTable_OFFLINE";
-
-    IdealState idealState = new IdealState(offlineTableName);
-    // disable table in idealstate
-    idealState.enable(false);
-    idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
-    idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE");
-    idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE");
+  public void noSegmentZKMetadataTest() {
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+    idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setReplicas("1");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            null);
-
-    // verify state before test
-    assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 0);
-
-    // update metrics
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-    assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 1);
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 1, 1, 1, 100, 0, 100, 0, 0);
   }
 
   @Test
-  public void disabledEmptyTableTest() {
-    String offlineTableName = "myTable_OFFLINE";
-
-    IdealState idealState = new IdealState(offlineTableName);
+  public void disabledTableTest() {
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
     // disable table in idealstate
     idealState.enable(false);
-    idealState.setReplicas("1");
+    idealState.setPartitionState("myTable_OFFLINE", "pinot1", "ONLINE");
+    idealState.setPartitionState("myTable_OFFLINE", "pinot2", "ONLINE");
+    idealState.setPartitionState("myTable_OFFLINE", "pinot3", "ONLINE");
+    idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            null);
-
-    // verify state before test
-    assertFalse(MetricValueUtils.globalGaugeExists(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT));
-
-    // update metrics
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
+    runSegmentStatusChecker(resourceManager, 0);
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 1);
+    verifyControllerMetricsNotExist();
   }
 
   @Test
-  public void noSegments()
-      throws Exception {
-    noSegmentsInternal(0);
-    noSegmentsInternal(5);
-    noSegmentsInternal(-1);
+  public void noSegmentTest() {
+    noSegmentTest(0);
+    noSegmentTest(5);
+    noSegmentTest(-1);
+  }
+
+  public void noSegmentTest(int numReplicas) {
+    String numReplicasStr = numReplicas >= 0 ? Integer.toString(numReplicas) : 
"abc";
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+    idealState.setReplicas(numReplicasStr);
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    int expectedNumReplicas = Math.max(numReplicas, 1);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 0, 0, expectedNumReplicas, 
100, 0, 100, 0, 0);
   }
 
   @Test
-  public void lessThanOnePercentSegmentsUnavailableTest()
-      throws Exception {
-    String offlineTableName = "myTable_OFFLINE";
+  public void lessThanOnePercentSegmentsUnavailableTest() {
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(1).build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build();
 
-    IdealState idealState = new IdealState(offlineTableName);
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
     int numSegments = 200;
     for (int i = 0; i < numSegments; i++) {
       idealState.setPartitionState("myTable_" + i, "pinot1", "ONLINE");
@@ -715,107 +499,24 @@ public class SegmentStatusCheckerTest {
     idealState.setReplicas("1");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(offlineTableName);
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
     externalView.setState("myTable_0", "pinot1", "OFFLINE");
     for (int i = 1; i < numSegments; i++) {
       externalView.setState("myTable_" + i, "pinot1", "ONLINE");
     }
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
-      
when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig);
-      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
-    }
-    {
-      _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      SegmentLineage segmentLineage = new SegmentLineage(offlineTableName);
-      when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), 
any(),
-          
eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord());
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99);
-  }
-
-  public void noSegmentsInternal(final int nReplicas)
-      throws Exception {
-    String realtimeTableName = "myTable_REALTIME";
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+    
when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView);
+    SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 
11111L);
+    when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), 
anyString())).thenReturn(segmentZKMetadata);
 
-    String nReplicasStr = Integer.toString(nReplicas);
-    int nReplicasExpectedValue = nReplicas;
-    if (nReplicas < 0) {
-      nReplicasStr = "abc";
-      nReplicasExpectedValue = 1;
-    }
-    IdealState idealState = new IdealState(realtimeTableName);
-    idealState.setReplicas(nReplicasStr);
-    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
 
-    {
-      _helixResourceManager = mock(PinotHelixResourceManager.class);
-      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
-      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
-    }
-    {
-      _config = mock(ControllerConf.class);
-      when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    {
-      _leadControllerManager = mock(LeadControllerManager.class);
-      
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    }
-    {
-      _tableSizeReader = mock(TableSizeReader.class);
-      when(_tableSizeReader.getTableSizeDetails(anyString(), 
anyInt())).thenReturn(null);
-    }
-    PinotMetricUtils.cleanUp();
-    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _segmentStatusChecker =
-        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-            _tableSizeReader);
-    _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
-    _segmentStatusChecker.start();
-    _segmentStatusChecker.run();
-
-    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE));
-    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE));
-    assertEquals(
-        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
-        nReplicasExpectedValue);
-    assertEquals(
-        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS),
-        100);
-    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    runSegmentStatusChecker(resourceManager, 0);
+    verifyControllerMetrics(OFFLINE_TABLE_NAME, 1, numSegments, numSegments, 
0, 0, 0, 99, 0, 246800);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to