This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch full-auto-same-state-model
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 4ea45ce08f423a9d9428ac1c752ae45ede921e05
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Wed Mar 6 12:57:45 2024 -0800

    Leverage ONLINE-OFFLINE state model for realtime tables
---
 .../pinot/broker/routing/BrokerRoutingManager.java |  3 +-
 .../instanceselector/BaseInstanceSelector.java     | 25 ++-----
 .../SegmentPartitionMetadataManager.java           |  2 +-
 .../instanceselector/InstanceSelectorTest.java     | 13 ++--
 .../helix/core/PinotHelixResourceManager.java      | 25 ++++---
 .../helix/core/PinotTableIdealStateHelper.java     |  2 +-
 .../segment/RealtimeSegmentAssignment.java         |  5 +-
 .../assignment/segment/SegmentAssignmentUtils.java | 22 ++++--
 .../segment/StrictRealtimeSegmentAssignment.java   |  8 +--
 .../realtime/MissingConsumingSegmentFinder.java    | 23 ++++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 82 ++++++++++++----------
 ...altimeNonReplicaGroupSegmentAssignmentTest.java |  4 +-
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |  6 +-
 .../RealtimeReplicaGroupSegmentAssignmentTest.java |  4 +-
 .../StrictRealtimeSegmentAssignmentTest.java       |  2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 23 +++---
 .../helix/core/rebalance/TableRebalancerTest.java  |  7 +-
 .../realtime/RealtimeSegmentDataManager.java       | 37 ++++++++--
 .../manager/realtime/RealtimeTableDataManager.java |  8 +--
 .../realtime/RealtimeSegmentDataManagerTest.java   | 15 ++--
 .../ControllerPeriodicTasksIntegrationTest.java    |  2 +-
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  4 +-
 ...PartialUpsertTableRebalanceIntegrationTest.java |  2 +-
 .../UpsertTableSegmentPreloadIntegrationTest.java  |  2 +-
 .../UpsertTableSegmentUploadIntegrationTest.java   |  2 +-
 .../local/utils/tablestate/TableStateUtils.java    | 25 +++++--
 .../pinot/server/api/resources/TablesResource.java | 62 ++++++++++------
 .../server/starter/helix/BaseServerStarter.java    | 17 ++++-
 ...flineSegmentOnlineOfflineStateModelFactory.java | 11 +--
 .../airlineStats_realtime_table_config.json        |  3 +
 30 files changed, 273 insertions(+), 173 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index cc3a5354ef..01e1296a26 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -543,8 +543,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     Set<String> onlineSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
       Map<String, String> instanceStateMap = entry.getValue();
-      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || 
instanceStateMap.containsValue(
-          SegmentStateModel.CONSUMING)) {
+      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
         onlineSegments.add(entry.getKey());
       }
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index de0a5f9600..492c920669 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
@@ -122,7 +121,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
    * Returns whether the instance state is online for routing purpose 
(ONLINE/CONSUMING).
    */
   static boolean isOnlineForRouting(@Nullable String state) {
-    return SegmentStateModel.ONLINE.equals(state) || 
SegmentStateModel.CONSUMING.equals(state);
+    return SegmentStateModel.ONLINE.equals(state);
   }
 
   /**
@@ -200,7 +199,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     for (Map.Entry<String, String> entry : 
idealStateInstanceStateMap.entrySet()) {
       String instance = entry.getKey();
       // NOTE: DO NOT check if EV matches IS because it is a valid state when 
EV is CONSUMING while IS is ONLINE
-      if (isOnlineForRouting(externalViewInstanceStateMap.get(instance))) {
+      if (isOnlineForRouting(entry.getValue()) && 
isOnlineForRouting(externalViewInstanceStateMap.get(instance))) {
         onlineInstances.add(instance);
       }
     }
@@ -218,14 +217,6 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     }
   }
 
-  static SortedSet<String> convertToSortedSet(Set<String> set) {
-    if (set instanceof SortedSet) {
-      return (SortedSet<String>) set;
-    } else {
-      return new TreeSet<>(set);
-    }
-  }
-
   /**
    * Updates the segment maps based on the given ideal state, external view, 
online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and pre-selected by the 
{@link SegmentPreSelector}) and new segments.
@@ -239,21 +230,19 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     _newSegmentStateMap = new 
HashMap<>(HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size()));
 
     Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
-    Set<String> idealStateSegmentSet = idealState.getPartitionSet();
     Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
     for (String segment : onlineSegments) {
       Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-
       Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
       Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
       if (externalViewInstanceStateMap == null) {
         if (newSegmentCreationTimeMs != null) {
           // New segment
-          List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(Integer.parseInt(idealState.getReplicas()));
-          for (String segmentName : convertToSortedSet(idealStateSegmentSet)) {
-//            if (isOnlineForRouting(entry.getValue())) {
-            candidates.add(new SegmentInstanceCandidate(segmentName, false));
-//            }
+          List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
+          for (Map.Entry<String, String> entry : 
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
+            if (isOnlineForRouting(entry.getValue())) {
+              candidates.add(new SegmentInstanceCandidate(entry.getKey(), 
false));
+            }
           }
           _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMs, candidates));
         } else {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
index 3623955591..b3ba0ac28a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
@@ -127,7 +127,7 @@ public class SegmentPartitionMetadataManager implements 
SegmentZkMetadataFetchLi
     List<String> onlineServers = new ArrayList<>(instanceStateMap.size());
     for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
       String instanceState = entry.getValue();
-      if (instanceState.equals(SegmentStateModel.ONLINE) || 
instanceState.equals(SegmentStateModel.CONSUMING)) {
+      if (instanceState.equals(SegmentStateModel.ONLINE)) {
         onlineServers.add(entry.getKey());
       }
     }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index c748be4885..c44614c88b 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -61,7 +61,6 @@ import org.testng.annotations.Test;
 import static 
org.apache.pinot.broker.routing.instanceselector.InstanceSelector.NEW_SEGMENT_EXPIRATION_MILLIS;
 import static 
org.apache.pinot.spi.config.table.RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE;
 import static 
org.apache.pinot.spi.config.table.RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE;
-import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
@@ -1105,15 +1104,15 @@ public class InstanceSelectorTest {
     String segment0 = "segment0";
     String segment1 = "segment1";
     Map<String, String> idealStateInstanceStateMap = new TreeMap<>();
-    idealStateInstanceStateMap.put(instance, CONSUMING);
+    idealStateInstanceStateMap.put(instance, ONLINE);
     idealStateInstanceStateMap.put(errorInstance, ONLINE);
     idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap);
     idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap);
     Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
-    externalViewInstanceStateMap0.put(instance, CONSUMING);
+    externalViewInstanceStateMap0.put(instance, ONLINE);
     externalViewInstanceStateMap0.put(errorInstance, ERROR);
     Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
-    externalViewInstanceStateMap1.put(instance, CONSUMING);
+    externalViewInstanceStateMap1.put(instance, ONLINE);
     externalViewInstanceStateMap1.put(errorInstance, ERROR);
     externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);
     externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap1);
@@ -1359,10 +1358,10 @@ public class InstanceSelectorTest {
       //     (enabled)  errorInstance: ERROR
       //   }
       // }
-      idealStateInstanceStateMap.put(instance, CONSUMING);
-      externalViewInstanceStateMap0.put(instance, CONSUMING);
+      idealStateInstanceStateMap.put(instance, ONLINE);
+      externalViewInstanceStateMap0.put(instance, ONLINE);
       externalViewInstanceStateMap0.put(errorInstance, ERROR);
-      externalViewInstanceStateMap1.put(instance, CONSUMING);
+      externalViewInstanceStateMap1.put(instance, ONLINE);
       balancedInstanceSelector.onAssignmentChange(idealState, externalView, 
onlineSegments);
       strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, 
externalView, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments, 0);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8a5584eab7..57c75d7618 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2814,13 +2814,20 @@ public class PinotHelixResourceManager {
     if (idealState == null) {
       throw new IllegalStateException("Ideal state does not exist for table: " 
+ tableNameWithType);
     }
-    Set<String> consumingSegments = new HashSet<>();
-    for (String segment : idealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segment);
-      if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
-        consumingSegments.add(segment);
+    return getConsumingSegments(idealState);
+  }
+
+  public Set<String> getConsumingSegments(IdealState idealState) {
+    Set<String> consumingSegments = new TreeSet<>();
+    idealState.getRecord().getMapFields().forEach((segmentName, 
instanceToStateMap) -> {
+      if (instanceToStateMap.containsValue(SegmentStateModel.ONLINE)) {
+        SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(idealState.getResourceName(), segmentName);
+        if (segmentZKMetadata != null
+            && segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+          consumingSegments.add(segmentName);
+        }
       }
-    }
+    });
     return consumingSegments;
   }
 
@@ -3913,8 +3920,7 @@ public class PinotHelixResourceManager {
     Set<String> matchingSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
       Map<String, String> instanceStateMap = entry.getValue();
-      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || 
(includeConsuming
-          && instanceStateMap.containsValue(SegmentStateModel.CONSUMING))) {
+      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
         matchingSegments.add(entry.getKey());
       }
     }
@@ -3931,8 +3937,7 @@ public class PinotHelixResourceManager {
     Set<String> onlineSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
       Map<String, String> instanceStateMap = entry.getValue();
-      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || 
instanceStateMap.containsValue(
-          SegmentStateModel.CONSUMING)) {
+      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
         onlineSegments.add(entry.getKey());
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
index 1135d05f5d..37c4b7555b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java
@@ -68,7 +68,7 @@ public class PinotTableIdealStateHelper {
           
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
     } else {
       stateModel =
-          
PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+          
PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
     }
 
     // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, 
crushed auto-rebalance by default
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 434dbec4e3..8d69262d0f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -194,7 +194,8 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
         consumingInstancePartitions, includeConsuming, bootstrap);
 
     SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
-        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment);
+        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(_helixManager,
 _tableNameWithType,
+            nonTierAssignment);
     Map<String, Map<String, String>> newAssignment;
 
     // Reassign COMPLETED segments first
@@ -237,7 +238,7 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
       for (String segmentName : consumingSegmentAssignment.keySet()) {
         List<String> instancesAssigned = assignConsumingSegment(segmentName, 
consumingInstancePartitions);
         Map<String, String> instanceStateMap =
-            SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING);
+            SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE);
         newAssignment.put(segmentName, instanceStateMap);
       }
     } else {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 46e4cc4eca..0aaee3b0ac 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -39,6 +39,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.Pairs;
 
@@ -324,14 +325,27 @@ public class SegmentAssignmentUtils {
     //       1. At least one instance ONLINE -> COMPLETED segment
     //       2. At least one instance CONSUMING -> CONSUMING segment
     //       3. All instances OFFLINE (all instances encountered error while 
consuming) -> OFFLINE segment
-    CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, 
String>> segmentAssignment) {
+    CompletedConsumingOfflineSegmentAssignment(HelixManager helixManager, 
String tableName,
+        Map<String, Map<String, String>> segmentAssignment) {
       for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
         String segmentName = entry.getKey();
         Map<String, String> instanceStateMap = entry.getValue();
         if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
-          _completedSegmentAssignment.put(segmentName, instanceStateMap);
-        } else if 
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
-          _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+          SegmentZKMetadata segmentZKMetadata =
+              
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
tableName, segmentName);
+          if (segmentZKMetadata == null) {
+            _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+          } else {
+            CommonConstants.Segment.Realtime.Status status = 
segmentZKMetadata.getStatus();
+            if (status == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) 
{
+              _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+            } else {
+              _completedSegmentAssignment.put(segmentName, instanceStateMap);
+            }
+          }
+//          _completedSegmentAssignment.put(segmentName, instanceStateMap);
+//        } else if 
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+//          _consumingSegmentAssignment.put(segmentName, instanceStateMap);
         } else {
           _offlineSegmentAssignment.put(segmentName, instanceStateMap);
         }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index 2bc0ada5bd..ceeb111203 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -132,8 +132,7 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
    * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor 
CONSUMING), {@code false} otherwise.
    */
   private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
-    return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) && 
!instanceStateMap.containsValue(
-        SegmentStateModel.CONSUMING);
+    return !instanceStateMap.containsValue(SegmentStateModel.ONLINE);
   }
 
   /**
@@ -180,8 +179,9 @@ public class StrictRealtimeSegmentAssignment extends 
RealtimeSegmentAssignment {
         // Reassign CONSUMING and COMPLETED segments
         List<String> instancesAssigned =
             assignConsumingSegment(getPartitionIdUsingCache(segmentName), 
instancePartitions);
-        String state = 
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? 
SegmentStateModel.CONSUMING
-            : SegmentStateModel.ONLINE;
+        String state = SegmentStateModel.ONLINE;
+//        String state = 
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? 
SegmentStateModel.CONSUMING
+//            : SegmentStateModel.ONLINE;
         newAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index d248f52d4c..c9850856cd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -42,7 +42,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
-import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +62,7 @@ public class MissingConsumingSegmentFinder {
   private final SegmentMetadataFetcher _segmentMetadataFetcher;
   private final Map<Integer, StreamPartitionMsgOffset> 
_partitionGroupIdToLargestStreamOffsetMap;
   private final StreamPartitionMsgOffsetFactory 
_streamPartitionMsgOffsetFactory;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
 
   private ControllerMetrics _controllerMetrics;
 
@@ -69,6 +70,7 @@ public class MissingConsumingSegmentFinder {
       ControllerMetrics controllerMetrics, StreamConfig streamConfig) {
     _realtimeTableName = realtimeTableName;
     _controllerMetrics = controllerMetrics;
+    _propertyStore = propertyStore;
     _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, 
controllerMetrics);
     _streamPartitionMsgOffsetFactory =
         
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
@@ -94,6 +96,7 @@ public class MissingConsumingSegmentFinder {
       StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) {
     _realtimeTableName = realtimeTableName;
     _segmentMetadataFetcher = segmentMetadataFetcher;
+    _propertyStore = segmentMetadataFetcher._propertyStore;
     _partitionGroupIdToLargestStreamOffsetMap = 
partitionGroupIdToLargestStreamOffsetMap;
     _streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory;
   }
@@ -118,11 +121,21 @@ public class MissingConsumingSegmentFinder {
     idealStateMap.forEach((segmentName, instanceToStatusMap) -> {
       LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
       if (llcSegmentName != null) { // Skip the uploaded realtime segments 
that don't conform to llc naming
-        if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) {
-          updateMap(partitionGroupIdToLatestConsumingSegmentMap, 
llcSegmentName);
-        } else if 
(instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) {
-          updateMap(partitionGroupIdToLatestCompletedSegmentMap, 
llcSegmentName);
+        SegmentZKMetadata segmentZKMetadata =
+            ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
_realtimeTableName, segmentName);
+        if (segmentZKMetadata != null) {
+          CommonConstants.Segment.Realtime.Status status = 
segmentZKMetadata.getStatus();
+          if (status == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+            updateMap(partitionGroupIdToLatestConsumingSegmentMap, 
llcSegmentName);
+          } else if (status == CommonConstants.Segment.Realtime.Status.DONE) {
+            updateMap(partitionGroupIdToLatestCompletedSegmentMap, 
llcSegmentName);
+          }
         }
+//        if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) {
+//          updateMap(partitionGroupIdToLatestConsumingSegmentMap, 
llcSegmentName);
+//        } else if 
(instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) {
+//          updateMap(partitionGroupIdToLatestCompletedSegmentMap, 
llcSegmentName);
+//        }
       }
     });
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 30f10f0021..40215c43a4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -533,17 +532,17 @@ public class PinotLLCRealtimeSegmentManager {
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     IdealState idealState = getIdealState(realtimeTableName);
-    ExternalView externalView = getExternalView(realtimeTableName);
+//    ExternalView externalView = getExternalView(realtimeTableName);
     // Check whether there is at least 1 replica in ONLINE state for full-auto 
mode.
-    if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
-      Preconditions.checkState(
-          
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
-          "Failed to find instance in ONLINE state in IdealState for segment: 
%s", committingSegmentName);
-    } else {
-      Preconditions.checkState(
-          
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-          "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);
-    }
+//    if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) 
{
+//      Preconditions.checkState(
+//          
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+//          "Failed to find instance in ONLINE state in IdealState for 
segment: %s", committingSegmentName);
+//    } else {
+//      Preconditions.checkState(
+//          
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE),
+//          "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);
+//    }
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -837,19 +836,26 @@ public class PinotLLCRealtimeSegmentManager {
     LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", 
segmentName, instanceName);
 
     try {
-      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
-        assert idealState != null;
-        // TODO: how to handle such state updates for FULL-AUTO mode? So far 
we don't enable FULL-AUTO for REALTIME
-        Map<String, String> stateMap = 
idealState.getInstanceStateMap(segmentName);
-        String state = stateMap.get(instanceName);
-        if (SegmentStateModel.CONSUMING.equals(state)) {
-          stateMap.put(instanceName, SegmentStateModel.OFFLINE);
-        } else {
-          LOGGER.info("Segment {} in state {} when trying to register 
consumption stop from {}", segmentName, state,
-              instanceName);
-        }
-        return idealState;
-      }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
+      // disable the instance that is assigned to the target realtime segment.
+      _helixAdmin.enablePartition(false, _clusterName, instanceName, 
realtimeTableName,
+          Collections.singletonList(segmentName));
+//      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
+//        assert idealState != null;
+//        // TODO: how to handle such state updates for FULL-AUTO mode? So far 
we don't enable FULL-AUTO for REALTIME
+//        Map<String, String> stateMap = 
idealState.getInstanceStateMap(segmentName);
+//        String state = stateMap.get(instanceName);
+//        if (SegmentStateModel.ONLINE.equals(state)) {
+//          SegmentZKMetadata segmentZKMetadata =
+//              ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
realtimeTableName, segmentName);
+//          if (segmentZKMetadata != null && segmentZKMetadata.getStatus() == 
Status.IN_PROGRESS) {
+//            stateMap.put(instanceName, SegmentStateModel.OFFLINE);
+//          }
+//        } else {
+//          LOGGER.info("Segment {} in state {} when trying to register 
consumption stop from {}", segmentName, state,
+//              instanceName);
+//        }
+//        return idealState;
+//      }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
       throw e;
@@ -1195,7 +1201,7 @@ public class PinotLLCRealtimeSegmentManager {
       Map<String, String> instanceStateMap = 
instanceStatesMap.get(latestSegmentName);
       if (instanceStateMap != null) {
         // Latest segment of metadata is in idealstate.
-        if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+        if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
           if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
 
             // step-1 of commmitSegmentMetadata is done (i.e. marking old 
segment as DONE)
@@ -1294,13 +1300,20 @@ public class PinotLLCRealtimeSegmentManager {
         if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
           // Find the previous CONSUMING segment
           String previousConsumingSegment = null;
-          for (Map.Entry<String, Map<String, String>> segmentEntry : 
instanceStatesMap.entrySet()) {
-            if 
(segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING)
-                && new 
LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == 
partitionGroupId) {
-              previousConsumingSegment = segmentEntry.getKey();
+          Set<String> consumingSegments = findConsumingSegments(idealState);
+          for (String consumingSegment : consumingSegments) {
+            if (new LLCSegmentName(consumingSegment).getPartitionGroupId() == 
partitionGroupId) {
+              previousConsumingSegment = consumingSegment;
               break;
             }
           }
+//          for (Map.Entry<String, Map<String, String>> segmentEntry : 
instanceStatesMap.entrySet()) {
+//            if 
(segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING)
+//                && new 
LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == 
partitionGroupId) {
+//              previousConsumingSegment = segmentEntry.getKey();
+//              break;
+//            }
+//          }
           if (previousConsumingSegment == null) {
             LOGGER.error(
                 "Failed to find previous CONSUMING segment for partition: {} 
of table: {}, potential data loss",
@@ -1755,16 +1768,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private Set<String> findConsumingSegments(IdealState idealState) {
-    Set<String> consumingSegments = new TreeSet<>();
-    idealState.getRecord().getMapFields().forEach((segmentName, 
instanceToStateMap) -> {
-      for (String state : instanceToStateMap.values()) {
-        if (state.equals(SegmentStateModel.CONSUMING)) {
-          consumingSegments.add(segmentName);
-          break;
-        }
-      }
-    });
-    return consumingSegments;
+    return _helixResourceManager.getConsumingSegments(idealState);
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 9d921d3af8..280214854d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -244,7 +244,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
         // Check the CONSUMING segments
         newAssignment.get(_segments.get(segmentId)).forEach((instance, state) 
-> {
           assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
-          assertEquals(state, SegmentStateModel.CONSUMING);
+          assertEquals(state, SegmentStateModel.ONLINE);
         });
       }
     }
@@ -345,7 +345,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
 
     // Add the new segment into the assignment as CONSUMING
     currentAssignment.put(_segments.get(segmentId),
-        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
   }
 
   private HelixManager createHelixManager() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index e03a8d4a46..e7b5895a47 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -196,7 +196,7 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
         Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
-          assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
+          assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
         }
       }
     }
@@ -227,7 +227,7 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
         Map<String, String> instanceStateMap = 
newAssignment.get(_segments.get(segmentId));
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           
assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
-          assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
+          assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
         }
       }
     }
@@ -345,7 +345,7 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
 
     // Add the new segment into the assignment as CONSUMING
     currentAssignment.put(_segments.get(segmentId),
-        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 2c590ef25e..cbac3133cf 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -231,7 +231,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
         // check CONSUMING segments
         newAssignment.get(_segments.get(segmentId)).forEach((instance, state) 
-> {
           assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX));
-          assertEquals(state, SegmentStateModel.CONSUMING);
+          assertEquals(state, SegmentStateModel.ONLINE);
         });
       }
     }
@@ -446,7 +446,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
 
     // Add the new segment into the assignment as CONSUMING
     currentAssignment.put(_segments.get(segmentId),
-        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
   }
 
   private HelixManager createHelixManager() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
index 13520bb4f8..524a9b9606 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java
@@ -274,7 +274,7 @@ public class StrictRealtimeSegmentAssignmentTest {
 
     // Add the new segment into the assignment as CONSUMING
     currentAssignment.put(_segments.get(segmentId),
-        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.CONSUMING));
+        SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE));
   }
 
   private HelixManager createHelixManager() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index a882a47ec7..26641d515c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -179,7 +179,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       assertNotNull(instanceStateMap);
       assertEquals(instanceStateMap.size(), numReplicas);
       for (String state : instanceStateMap.values()) {
-        assertEquals(state, SegmentStateModel.CONSUMING);
+        assertEquals(state, SegmentStateModel.ONLINE);
       }
 
       SegmentZKMetadata segmentZKMetadata = 
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentName, null);
@@ -223,7 +223,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.get(consumingSegment);
     assertNotNull(consumingSegmentInstanceStateMap);
     assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
-        Collections.singleton(SegmentStateModel.CONSUMING));
+        Collections.singleton(SegmentStateModel.ONLINE));
 
     // Verify segment ZK metadata for committed segment and new consuming 
segment
     SegmentZKMetadata committedSegmentZKMetadata = 
segmentManager._segmentZKMetadataMap.get(committingSegment);
@@ -260,7 +260,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
     assertNotNull(consumingSegmentInstanceStateMap);
     assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
-        Collections.singleton(SegmentStateModel.CONSUMING));
+        Collections.singleton(SegmentStateModel.ONLINE));
 
     // Illegal segment commit - commit the segment again
     try {
@@ -421,7 +421,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       Map<String, String> instanceStateMap = 
instanceStatesMap.get(segmentName);
       assertEquals(instanceStateMap.size(), segmentManager._numReplicas);
       for (String state : instanceStateMap.values()) {
-        assertEquals(state, SegmentStateModel.CONSUMING);
+        assertEquals(state, SegmentStateModel.ONLINE);
       }
       // NOTE: Old segment ZK metadata might exist when previous round failed 
due to not enough instances
       assertTrue(segmentZKMetadataMap.containsKey(segmentName));
@@ -622,7 +622,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.remove(consumingSegment);
     assertNotNull(consumingSegmentInstanceStateMap);
     assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
-        Collections.singleton(SegmentStateModel.CONSUMING));
+        Collections.singleton(SegmentStateModel.ONLINE));
 
     if (latestCommittedSegment != null) {
       Map<String, String> latestCommittedSegmentInstanceStateMap = 
instanceStatesMap.get(latestCommittedSegment);
@@ -630,7 +630,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       for (Map.Entry<String, String> entry : 
latestCommittedSegmentInstanceStateMap.entrySet()) {
         // Latest committed segment should have all instances in ONLINE state
         assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
-        entry.setValue(SegmentStateModel.CONSUMING);
+        entry.setValue(SegmentStateModel.ONLINE);
       }
     }
   }
@@ -644,7 +644,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertNotNull(consumingSegmentInstanceStateMap);
     for (Map.Entry<String, String> entry : 
consumingSegmentInstanceStateMap.entrySet()) {
       // Consuming segment should have all instances in CONSUMING state
-      assertEquals(entry.getValue(), SegmentStateModel.CONSUMING);
+      assertEquals(entry.getValue(), SegmentStateModel.ONLINE);
       entry.setValue(SegmentStateModel.OFFLINE);
     }
   }
@@ -657,7 +657,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.get(consumingSegment);
     assertNotNull(consumingSegmentInstanceStateMap);
     for (Map.Entry<String, String> entry : 
consumingSegmentInstanceStateMap.entrySet()) {
-      entry.setValue(SegmentStateModel.CONSUMING);
+      entry.setValue(SegmentStateModel.ONLINE);
     }
   }
 
@@ -702,8 +702,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       Map<String, String> instanceStateMap = entry.getValue();
 
       // Skip segments with all instances OFFLINE
-      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || 
instanceStateMap
-          .containsValue(SegmentStateModel.CONSUMING)) {
+      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
         LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
         int partitionsId = llcSegmentName.getPartitionGroupId();
         Map<Integer, String> sequenceNumberToSegmentMap = 
partitionGroupIdToSegmentsMap.get(partitionsId);
@@ -723,8 +722,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
       Map<String, String> instanceStateMap = 
instanceStatesMap.get(latestSegment);
       if (!shardsEnded.contains(partitionGroupId)) {
         // Latest segment should have CONSUMING instance but no ONLINE 
instance in ideal state
-        
assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
-        assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
+        assertTrue(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
+//        
assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
 
         // Latest segment ZK metadata should be IN_PROGRESS
         
assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(),
 Status.IN_PROGRESS);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index ecf1e0feda..cf0476b3b0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
@@ -1293,7 +1292,7 @@ public class TableRebalancerTest {
             true, null));
 
     // Should fail when a segment has CONSUMING instance in IdealState but 
does not exist in ExternalView
-    instanceStateMap.put("instance2", CONSUMING);
+    instanceStateMap.put("instance2", ONLINE);
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
             false, null));
@@ -1321,7 +1320,7 @@ public class TableRebalancerTest {
             true, null));
 
     // Should pass when instance state matches
-    instanceStateMap.put("instance2", CONSUMING);
+    instanceStateMap.put("instance2", ONLINE);
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
             false, null));
@@ -1330,7 +1329,7 @@ public class TableRebalancerTest {
             true, null));
 
     // Should pass when there are extra instances in ExternalView
-    instanceStateMap.put("instance3", CONSUMING);
+    instanceStateMap.put("instance3", ONLINE);
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
             false, null));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 5cdb3dda3a..95345cbd85 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -37,7 +37,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -95,6 +98,7 @@ import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -223,6 +227,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final SegmentZKMetadata _segmentZKMetadata;
   private final TableConfig _tableConfig;
   private final RealtimeTableDataManager _realtimeTableDataManager;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final StreamDataDecoder _streamDataDecoder;
   private final int _segmentMaxRowCount;
   private final String _resourceDataDir;
@@ -733,7 +738,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               hold();
               break;
             case DISCARD:
-              // Keep this in memory, but wait for the online transition, and 
download when it comes in.
+              // When exiting the while loop, attempts to download the copy.
               _state = State.DISCARDED;
               break;
             case KEEP:
@@ -784,6 +789,28 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               break;
           }
         }
+        // After going out of the while loop, if the current server still 
doesn't have a copy of the completed segments,
+        // then attempts to bring up the segment with one last effort.
+        if (!_shouldStop) {
+          if (_state != State.COMMITTED && _state != State.RETAINED) {
+            _segmentLogger.info("Trying to bring the segment online. Current 
state: " + _state.toString());
+            _segmentLogger.info("Should stop: " + _shouldStop);
+            long maxEndTime =
+                now() + 
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, 
TimeUnit.SECONDS);
+            while (now() < maxEndTime) {
+              SegmentZKMetadata zkMetadata =
+                  ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
_tableNameWithType, _segmentNameStr);
+              if (zkMetadata != null) {
+                if (zkMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+                  hold();
+                } else {
+                  goOnlineFromConsuming(zkMetadata);
+                  break;
+                }
+              }
+            }
+          }
+        }
       } catch (Exception e) {
         String errorMessage = "Exception while in work";
         _segmentLogger.error(errorMessage, e);
@@ -1349,15 +1376,17 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   // Assume that this is called only on OFFLINE to CONSUMING transition.
   // If the transition is OFFLINE to ONLINE, the caller should have downloaded 
the segment and we don't reach here.
   public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, 
TableConfig tableConfig,
-      RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
-      Schema schema, LLCSegmentName llcSegmentName, Semaphore 
partitionGroupConsumerSemaphore,
-      ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
+      RealtimeTableDataManager realtimeTableDataManager, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String resourceDataDir, IndexLoadingConfig indexLoadingConfig, Schema 
schema, LLCSegmentName llcSegmentName,
+      Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics,
+      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isReadyToConsumeData) {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = segmentZKMetadata;
     _tableConfig = tableConfig;
     _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
+    _propertyStore = propertyStore;
     _resourceDataDir = resourceDataDir;
     _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2ddc0f7eb5..c488101f75 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -311,7 +311,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   public Set<Integer> getHostedPartitionsGroupIds() {
     Set<Integer> partitionsHostedByThisServer = new HashSet<>();
     List<String> segments = 
TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, 
_tableNameWithType,
-        CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+        Status.IN_PROGRESS);
     for (String segmentNameStr : segments) {
       LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
       partitionsHostedByThisServer.add(segmentName.getPartitionGroupId());
@@ -440,9 +440,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
             : null;
     RealtimeSegmentDataManager realtimeSegmentDataManager =
-        new RealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, 
_indexDir.getAbsolutePath(),
-            indexLoadingConfig, schema, llcSegmentName, semaphore, 
_serverMetrics, partitionUpsertMetadataManager,
-            partitionDedupMetadataManager, _isTableReadyToConsumeData);
+        new RealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, 
_propertyStore,
+            _indexDir.getAbsolutePath(), indexLoadingConfig, schema, 
llcSegmentName, semaphore, _serverMetrics,
+            partitionUpsertMetadataManager, partitionDedupMetadataManager, 
_isTableReadyToConsumeData);
     realtimeSegmentDataManager.startConsumption();
     segmentDataManager = realtimeSegmentDataManager;
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index cc93fcfe77..70ec6241d3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -36,6 +36,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -145,7 +146,11 @@ public class RealtimeSegmentDataManagerTest {
     _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new 
Semaphore(1));
     Schema schema = Fixtures.createSchema();
     ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
tableDataManager,
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    when(propertyStore.get(anyString(), any(), 
anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+    HelixManager helixManager = mock(HelixManager.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
tableDataManager, propertyStore,
         new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema, 
llcSegmentName,
         _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
   }
@@ -946,11 +951,11 @@ public class RealtimeSegmentDataManagerTest {
     }
 
     public FakeRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, 
TableConfig tableConfig,
-        RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, Schema schema,
-        LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, 
ServerMetrics serverMetrics,
-        TimeSupplier timeSupplier)
+        RealtimeTableDataManager realtimeTableDataManager, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+        String resourceDataDir, Schema schema, LLCSegmentName llcSegmentName, 
Map<Integer, Semaphore> semaphoreMap,
+        ServerMetrics serverMetrics, TimeSupplier timeSupplier)
         throws Exception {
-      super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir,
+      super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
propertyStore, resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), 
tableConfig), schema, llcSegmentName,
           semaphoreMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics, null, null, () -> true);
       _state = RealtimeSegmentDataManager.class.getDeclaredField("_state");
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 39e4855f63..69fb078c94 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -302,7 +302,7 @@ public class ControllerPeriodicTasksIntegrationTest extends 
BaseClusterIntegrati
       for (Map<String, String> instanceStateMap : 
idealState.getRecord().getMapFields().values()) {
         for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
           String state = entry.getValue();
-          if (state.equals(SegmentStateModel.CONSUMING)) {
+          if (state.equals("CONSUMING")) {
             consumingServers.add(entry.getKey());
           } else if (state.equals(SegmentStateModel.ONLINE)) {
             completedServers.add(entry.getKey());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 50693aaf73..c7c30ebca1 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -415,7 +415,7 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
     Set<String> matchingSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
       Map<String, String> instanceStateMap = entry.getValue();
-      if 
(instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING))
 {
+      if 
(instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE))
 {
         matchingSegments.add(entry.getKey());
       }
     }
@@ -492,7 +492,7 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
       is.getPartitionSet().forEach(segmentNameStr -> {
         if (LLCSegmentName.isLLCSegment(segmentNameStr)) {
           if (is.getInstanceStateMap(segmentNameStr).values().contains(
-              CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) {
+              CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) {
             LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
             if (segmentName.getPartitionGroupId() == partition) {
               seqNum.set(segmentName.getSequenceNumber());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 8afcbe9177..8f8602d87f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -247,7 +247,7 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
         if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) {
           assertEquals(state, 
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
         } else {
-          assertEquals(state, 
CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+          assertEquals(state, 
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
         }
       } else {
         assertEquals(state, 
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index 9123c9660c..c6a1a7f992 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -260,7 +260,7 @@ public class UpsertTableSegmentPreloadIntegrationTest 
extends BaseClusterIntegra
         if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) {
           assertEquals(state, SegmentStateModel.ONLINE);
         } else {
-          assertEquals(state, SegmentStateModel.CONSUMING);
+          assertEquals(state, SegmentStateModel.ONLINE);
         }
       } else {
         assertEquals(state, SegmentStateModel.ONLINE);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 2e9f472d98..4f2cd5d977 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -192,7 +192,7 @@ public class UpsertTableSegmentUploadIntegrationTest 
extends BaseClusterIntegrat
       Map.Entry<String, String> instanceIdAndState = 
instanceStateMap.entrySet().iterator().next();
       String state = instanceIdAndState.getValue();
       if (LLCSegmentName.isLLCSegment(segmentName)) {
-        assertEquals(state, SegmentStateModel.CONSUMING);
+        assertEquals(state, SegmentStateModel.ONLINE);
       } else {
         assertEquals(state, SegmentStateModel.ONLINE);
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 6de4536de1..4f35c7f9c6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -27,6 +27,11 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,12 +49,13 @@ public class TableStateUtils {
    *
    * @param helixManager instance of Helix manager
    * @param tableNameWithType table for which we are obtaining segments in a 
given state
-   * @param state state of the segments to be returned
+   * @param status valid status of the segments to be returned
    *
    * @return List of segment names in a given state.
    */
   public static List<String> 
getSegmentsInGivenStateForThisInstance(HelixManager helixManager, String 
tableNameWithType,
-      String state) {
+      CommonConstants.Segment.Realtime.Status status) {
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
helixManager.getHelixPropertyStore();
     HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
     IdealState idealState = 
dataAccessor.getProperty(keyBuilder.idealStates(tableNameWithType));
@@ -66,11 +72,15 @@ public class TableStateUtils {
       String segmentName = entry.getKey();
       Map<String, String> instanceStateMap = entry.getValue();
       String expectedState = instanceStateMap.get(instanceName);
-      // Only track state segments assigned to the current instance
-      if (!state.equals(expectedState)) {
-        continue;
+      if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(expectedState))
 {
+        SegmentZKMetadata segmentZKMetadata =
+            ZKMetadataProvider.getSegmentZKMetadata(propertyStore, 
tableNameWithType, segmentName);
+        // Only track state segments assigned to the current instance
+        if (segmentZKMetadata != null && segmentZKMetadata.getStatus() != 
status) {
+          continue;
+        }
+        segmentsInGivenState.add(segmentName);
       }
-      segmentsInGivenState.add(segmentName);
     }
     return segmentsInGivenState;
   }
@@ -85,7 +95,8 @@ public class TableStateUtils {
    */
   public static boolean isAllSegmentsLoaded(HelixManager helixManager, String 
tableNameWithType) {
     List<String> onlineSegments =
-        getSegmentsInGivenStateForThisInstance(helixManager, 
tableNameWithType, SegmentStateModel.ONLINE);
+        getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType,
+            CommonConstants.Segment.Realtime.Status.DONE);
     if (onlineSegments.isEmpty()) {
       LOGGER.info("No ONLINE segment found for table: {}", tableNameWithType);
       return true;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b08833b966..fd067f353f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -95,7 +95,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -737,30 +736,47 @@ public class TablesResource {
         // Segment hosted by this server. Validate segment state
         String segmentName = entry.getKey();
         SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+        if (segmentDataManager == null) {
+          return new TableSegmentValidationInfo(false, -1);
+        }
         try {
-          switch (segmentState) {
-            case SegmentStateModel.CONSUMING:
-              // Only validate presence of segment
-              if (segmentDataManager == null) {
-                return new TableSegmentValidationInfo(false, -1);
-              }
-              break;
-            case SegmentStateModel.ONLINE:
-              // Validate segment CRC
-              SegmentZKMetadata zkMetadata =
-                  
ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
-                      tableNameWithType, segmentName);
-              Preconditions.checkState(zkMetadata != null,
-                  "Segment zk metadata not found for segment : " + 
segmentName);
-              if (segmentDataManager == null || 
!segmentDataManager.getSegment().getSegmentMetadata().getCrc()
-                  .equals(String.valueOf(zkMetadata.getCrc()))) {
-                return new TableSegmentValidationInfo(false, -1);
-              }
-              maxEndTimeMs = Math.max(maxEndTimeMs, zkMetadata.getEndTimeMs());
-              break;
-            default:
-              break;
+          IndexSegment indexSegment = segmentDataManager.getSegment();
+          if (indexSegment instanceof ImmutableSegment) {
+            // Validate segment CRC
+            SegmentZKMetadata zkMetadata =
+                
ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
+                    tableNameWithType, segmentName);
+            Preconditions.checkState(zkMetadata != null,
+                "Segment zk metadata not found for segment : " + segmentName);
+            if (!segmentDataManager.getSegment().getSegmentMetadata().getCrc()
+                .equals(String.valueOf(zkMetadata.getCrc()))) {
+              return new TableSegmentValidationInfo(false, -1);
+            }
+            maxEndTimeMs = Math.max(maxEndTimeMs, zkMetadata.getEndTimeMs());
           }
+//          switch (segmentState) {
+//            case SegmentStateModel.CONSUMING:
+//              // Only validate presence of segment
+//              if (segmentDataManager == null) {
+//                return new TableSegmentValidationInfo(false, -1);
+//              }
+//              break;
+//            case SegmentStateModel.ONLINE:
+//              // Validate segment CRC
+//              SegmentZKMetadata zkMetadata =
+//                  
ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
+//                      tableNameWithType, segmentName);
+//              Preconditions.checkState(zkMetadata != null,
+//                  "Segment zk metadata not found for segment : " + 
segmentName);
+//              if (segmentDataManager == null || 
!segmentDataManager.getSegment().getSegmentMetadata().getCrc()
+//                  .equals(String.valueOf(zkMetadata.getCrc()))) {
+//                return new TableSegmentValidationInfo(false, -1);
+//              }
+//              maxEndTimeMs = Math.max(maxEndTimeMs, 
zkMetadata.getEndTimeMs());
+//              break;
+//            default:
+//              break;
+//          }
         } finally {
           if (segmentDataManager != null) {
             tableDataManager.releaseSegment(segmentDataManager);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 9862ffb7dd..e0faea64c2 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -51,6 +51,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SystemResourceInfo;
@@ -298,10 +299,20 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         }
         if (checkRealtime && 
TableNameBuilder.isRealtimeTableResource(resourceName)) {
           for (String partitionName : idealState.getPartitionSet()) {
-            if (StateModel.SegmentStateModel.CONSUMING.equals(
+            if (StateModel.SegmentStateModel.ONLINE.equals(
                 
idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
-              consumingSegments.add(partitionName);
+              SegmentZKMetadata segmentZKMetadata =
+                  
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
resourceName,
+                      partitionName);
+              if (segmentZKMetadata != null
+                  && segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+                consumingSegments.add(partitionName);
+              }
             }
+//            if (StateModel.SegmentStateModel.CONSUMING.equals(
+//                
idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
+//              consumingSegments.add(partitionName);
+//            }
           }
         }
       }
@@ -871,7 +882,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     for (String partition : externalView.getPartitionSet()) {
       Map<String, String> instanceStateMap = 
externalView.getStateMap(partition);
       String state = instanceStateMap.get(_instanceId);
-      if (StateModel.SegmentStateModel.ONLINE.equals(state) || 
StateModel.SegmentStateModel.CONSUMING.equals(state)) {
+      if (StateModel.SegmentStateModel.ONLINE.equals(state)) {
         return false;
       }
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
index 0a0608b44d..ddb5dca96e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.server.starter.helix;
 
-import com.google.common.base.Preconditions;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
@@ -76,10 +75,14 @@ public class OfflineSegmentOnlineOfflineStateModelFactory 
extends StateModelFact
       String tableNameWithType = message.getResourceName();
       String segmentName = message.getPartitionName();
       TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-      Preconditions.checkArgument((tableType != null) && (tableType != 
TableType.REALTIME),
-          "TableType is null or is a REALTIME table, offline state model 
should not be called fo RT");
+//      Preconditions.checkArgument((tableType != null) && (tableType != 
TableType.REALTIME),
+//          "TableType is null or is a REALTIME table, offline state model 
should not be called fo RT");
       try {
-        _instanceDataManager.addOrReplaceSegment(tableNameWithType, 
segmentName);
+        if (tableType == TableType.OFFLINE) {
+          _instanceDataManager.addOrReplaceSegment(tableNameWithType, 
segmentName);
+        } else if (tableType == TableType.REALTIME) {
+          _instanceDataManager.addRealtimeSegment(tableNameWithType, 
segmentName);
+        }
       } catch (Exception e) {
         String errorMessage =
             String.format("Caught exception in state transition OFFLINE -> 
ONLINE for table: %s, segment: %s",
diff --git 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
index 018ee3b6e8..e2a26bf1a3 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json
@@ -3,6 +3,9 @@
   "tableType": "REALTIME",
   "tenants": {},
   "segmentsConfig": {
+    "completionConfig": {
+      "completionMode": "DOWNLOAD"
+    },
     "timeColumnName": "DaysSinceEpoch",
     "retentionTimeUnit": "DAYS",
     "retentionTimeValue": "5",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to