This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 14f01c8359 Order index types so servers can process them in a more 
deterministic order (#14391)
14f01c8359 is described below

commit 14f01c8359fca4045dba1af352b0ef7c8ace9c20
Author: Xiaobing <[email protected]>
AuthorDate: Wed Nov 6 11:38:15 2024 -0800

    Order index types so servers can process them in a more deterministic order 
(#14391)
    
    * order index types so servers can process them in a more deterministic 
order
    
    * refine fwd_index_handler logs a bit for easy debug
---
 .../converter/SegmentV1V2ToV3FormatConverter.java  | 10 +-----
 .../segment/index/loader/ForwardIndexHandler.java  | 37 +++++++++++++---------
 .../index/loader/ForwardIndexHandlerTest.java      | 18 +++++------
 .../pinot/segment/spi/index/IndexService.java      | 14 +++++---
 4 files changed, 42 insertions(+), 37 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
index 2b1df5e99e..bc5fb21ef0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
@@ -27,9 +27,7 @@ import java.nio.file.Files;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
@@ -155,7 +153,7 @@ public class SegmentV1V2ToV3FormatConverter implements 
SegmentFormatConverter {
       try (SegmentDirectory.Reader v2DataReader = v2Segment.createReader();
           SegmentDirectory.Writer v3DataWriter = v3Segment.createWriter()) {
         for (String column : v2Metadata.getAllColumns()) {
-          for (IndexType<?, ?, ?> indexType : sortedIndexTypes()) {
+          for (IndexType<?, ?, ?> indexType : 
IndexService.getInstance().getAllIndexes()) {
             // NOTE: Text index is copied separately
             if (indexType != StandardIndexes.text() && indexType != 
StandardIndexes.vector()) {
               copyIndexIfExists(v2DataReader, v3DataWriter, column, indexType);
@@ -171,12 +169,6 @@ public class SegmentV1V2ToV3FormatConverter implements 
SegmentFormatConverter {
     copyNativeTextIndexIfExists(v2Directory, v3Directory);
   }
 
-  private List<IndexType<?, ?, ?>> sortedIndexTypes() {
-    return IndexService.getInstance().getAllIndexes().stream()
-        .sorted((i1, i2) -> i1.getId().compareTo(i2.getId()))
-        .collect(Collectors.toList());
-  }
-
   private void copyIndexIfExists(SegmentDirectory.Reader reader, 
SegmentDirectory.Writer writer, String column,
       IndexType indexType)
       throws IOException {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index d5e4785df0..84d5df6dbc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -200,11 +200,11 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
     Set<String> existingAllColumns = 
_segmentDirectory.getSegmentMetadata().getAllColumns();
     Set<String> existingDictColumns = 
_segmentDirectory.getColumnsWithIndex(StandardIndexes.dictionary());
     Set<String> existingForwardIndexColumns = 
_segmentDirectory.getColumnsWithIndex(StandardIndexes.forward());
-
+    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
     for (String column : existingAllColumns) {
       if (_schema != null && !_schema.hasColumn(column)) {
         // _schema will be null only in tests
-        LOGGER.info("Column {} is not in schema, skipping updating forward 
index", column);
+        LOGGER.info("Column: {} of segment: {} is not in schema, skipping 
updating forward index", column, segmentName);
         continue;
       }
       boolean existingHasDict = existingDictColumns.contains(column);
@@ -221,7 +221,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         if (columnMetadata.isSorted()) {
           // Check if the column is sorted. If sorted, disabling forward index 
should be a no-op. Do not return an
           // operation for this column related to disabling forward index.
-          LOGGER.warn("Trying to disable the forward index for a sorted column 
{}, ignoring", column);
+          LOGGER.warn("Trying to disable the forward index for a sorted 
column: {} of segment: {}, ignoring", column,
+              segmentName);
           continue;
         }
 
@@ -230,8 +231,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
             // Dictionary was also disabled. Just disable the dictionary and 
remove it along with the forward index
             // If range index exists, don't try to regenerate it on toggling 
the dictionary, throw an error instead
             Preconditions.checkState(!newIsRange, String.format(
-                "Must disable range (enabled) index to disable the dictionary 
and forward index for column: %s or "
-                    + "refresh / back-fill the forward index", column));
+                "Must disable range index (enabled) to disable the dictionary 
and forward index for column: %s of "
+                    + "segment: %s or refresh / back-fill the forward index", 
column, segmentName));
             columnOperationsMap.put(column,
                 Arrays.asList(Operation.DISABLE_FORWARD_INDEX, 
Operation.DISABLE_DICTIONARY));
           } else {
@@ -255,7 +256,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         if (columnMetadata != null && columnMetadata.isSorted()) {
           // Check if the column is sorted. If sorted, disabling forward index 
should be a no-op and forward index
           // should already exist. Do not return an operation for this column 
related to enabling forward index.
-          LOGGER.warn("Trying to enable the forward index for a sorted column 
{}, ignoring", column);
+          LOGGER.warn("Trying to enable the forward index for a sorted column: 
{} of segment: {}, ignoring", column,
+              segmentName);
           continue;
         }
 
@@ -265,9 +267,10 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         if (!existingHasDict || 
!existingInvertedIndexColumns.contains(column)) {
           // If either dictionary or inverted index is missing on the column 
there is no way to re-generate the forward
           // index. Treat this as a no-op and log a warning.
-          LOGGER.warn("Trying to enable the forward index for a column {} 
missing either the dictionary ({}) and / or "
-                  + "the inverted index ({}) is not possible. Either a refresh 
or back-fill is required to get the "
-                  + "forward index, ignoring", column, existingHasDict ? 
"enabled" : "disabled",
+          LOGGER.warn(
+              "Trying to enable the forward index for a column: {} of segment: 
{} missing either the dictionary ({}) "
+                  + "and / or the inverted index ({}) is not possible. Either 
a refresh or back-fill is required to "
+                  + "get the forward index, ignoring", column, segmentName, 
existingHasDict ? "enabled" : "disabled",
               existingInvertedIndexColumns.contains(column) ? "enabled" : 
"disabled");
           continue;
         }
@@ -280,23 +283,24 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
         // If the dictionary is not enabled on the existing column it must be 
on the new noDictionary column list.
         // Cannot enable the dictionary for a column with forward index 
disabled.
         Preconditions.checkState(existingHasDict || !newIsDict, String.format(
-            "Cannot regenerate the dictionary for column %s with forward index 
disabled. Please refresh or back-fill "
-                + "the data to add back the forward index", column));
+            "Cannot regenerate the dictionary for column: %s of segment: %s 
with forward index disabled. Please "
+                + "refresh or back-fill the data to add back the forward 
index", column, segmentName));
 
         if (existingHasDict && !newIsDict) {
           // Dictionary is currently enabled on this column but is supposed to 
be disabled. Remove the dictionary
           // and update the segment metadata If the range index exists then 
throw an error since we are not
           // regenerating the range index on toggling the dictionary
           Preconditions.checkState(!newIsRange, String.format(
-              "Must disable range (enabled) index to disable the dictionary 
for a forwardIndexDisabled column: %s or "
-                  + "refresh / back-fill the forward index", column));
+              "Must disable range index (enabled) to disable the dictionary 
for a forwardIndexDisabled column: %s of "
+                  + "segment: %s or refresh / back-fill the forward index", 
column, segmentName));
           columnOperationsMap.put(column, 
Collections.singletonList(Operation.DISABLE_DICTIONARY));
         }
       } else if (!existingHasDict && newIsDict) {
         // Existing column is RAW. New column is dictionary enabled.
         if (_schema == null || _tableConfig == null) {
           // This can only happen in tests.
-          LOGGER.warn("Cannot enable dictionary for column={} as schema or 
tableConfig is null.", column);
+          LOGGER.warn("Cannot enable dictionary for column: {} of segment: {} 
as schema or tableConfig is null.",
+              column, segmentName);
           continue;
         }
         ColumnMetadata existingColumnMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
@@ -327,7 +331,10 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         }
       }
     }
-
+    if (!columnOperationsMap.isEmpty()) {
+      LOGGER.info("Need to apply columnOperations: {} for forward index for 
segment: {}", columnOperationsMap,
+          segmentName);
+    }
     return columnOperationsMap;
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 76285f649d..7d52919e91 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -844,9 +844,9 @@ public class ForwardIndexHandlerTest {
         computeOperations();
         fail("Enabling dictionary on forward index disabled column is not 
possible");
       } catch (IllegalStateException e) {
-        assertEquals(e.getMessage(), "Cannot regenerate the dictionary for 
column "
-            + "DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER with forward index 
disabled. Please refresh or back-fill "
-            + "the data to add back the forward index");
+        assertEquals(e.getMessage(), "Cannot regenerate the dictionary for 
column: "
+            + "DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER of segment: 
testSegment with forward index disabled. Please "
+            + "refresh or back-fill the data to add back the forward index");
       }
 
       // TEST12: Disable dictionary on a column that already has forward index 
disabled without an inverted index but
@@ -862,9 +862,9 @@ public class ForwardIndexHandlerTest {
         fail("Disabling dictionary on forward index disabled column without 
inverted index but which has a "
             + "range index is not possible");
       } catch (IllegalStateException e) {
-        assertEquals(e.getMessage(), "Must disable range (enabled) index to 
disable the dictionary for a "
-            + "forwardIndexDisabled column: 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX or refresh / "
-            + "back-fill the forward index");
+        assertEquals(e.getMessage(), "Must disable range index (enabled) to 
disable the dictionary for a "
+            + "forwardIndexDisabled column: 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX of segment: "
+            + "testSegment or refresh / back-fill the forward index");
       }
 
       // TEST13: Disable dictionary on a column that already has forward index 
disabled and inverted index enabled with
@@ -881,9 +881,9 @@ public class ForwardIndexHandlerTest {
         fail("Disabling dictionary on forward index disabled column with 
inverted index and a range index "
             + "is not possible");
       } catch (IllegalStateException e) {
-        assertEquals(e.getMessage(), "Must disable range (enabled) index to 
disable the dictionary for a "
-            + "forwardIndexDisabled column: 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER or refresh / back-fill the "
-            + "forward index");
+        assertEquals(e.getMessage(), "Must disable range index (enabled) to 
disable the dictionary for a "
+            + "forwardIndexDisabled column: 
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER of segment: testSegment or refresh "
+            + "/ back-fill the forward index");
       }
     }
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexService.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexService.java
index 3910ac43c7..08368f02b6 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexService.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexService.java
@@ -20,8 +20,10 @@
 package org.apache.pinot.segment.spi.index;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -54,7 +56,7 @@ public class IndexService {
 
   private static volatile IndexService _instance = fromServiceLoader();
 
-  private final Set<IndexType<?, ?, ?>> _allIndexes;
+  private final List<IndexType<?, ?, ?>> _allIndexes;
   private final Map<String, IndexType<?, ?, ?>> _allIndexesById;
 
   private IndexService(Set<IndexPlugin<?>> allPlugins) {
@@ -65,7 +67,11 @@ public class IndexService {
       builder.put(indexType.getId().toLowerCase(Locale.US), indexType);
     }
     _allIndexesById = builder.build();
-    _allIndexes = ImmutableSet.copyOf(_allIndexesById.values());
+    // Sort index types so that servers can loop over and process them in a 
more deterministic order.
+    List<String> allIndexIds = new ArrayList<>(_allIndexesById.keySet());
+    Collections.sort(allIndexIds);
+    _allIndexes = new ArrayList<>();
+    allIndexIds.forEach(id -> _allIndexes.add(_allIndexesById.get(id)));
   }
 
   /**
@@ -103,7 +109,7 @@ public class IndexService {
    *
    * @return an immutable list with all index types known by this instance.
    */
-  public Set<IndexType<?, ?, ?>> getAllIndexes() {
+  public List<IndexType<?, ?, ?>> getAllIndexes() {
     return _allIndexes;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to