yashmayya commented on code in PR #13037:
URL: https://github.com/apache/pinot/pull/13037#discussion_r1590974745


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java:
##########
@@ -757,57 +759,181 @@ private void createDerivedColumnV1Indices(String column, 
FunctionEvaluator funct
           } else {
             useVarLengthDictionary = 
_indexLoadingConfig.getVarLengthDictionaryColumns().contains(column);
           }
-          indexCreationInfo = new ColumnIndexCreationInfo(statsCollector, 
true, useVarLengthDictionary, true,
-              new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
+          indexCreationInfo = new ColumnIndexCreationInfo(statsCollector, 
createDictionary, useVarLengthDictionary,
+              true, new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
           break;
         }
         default:
           throw new IllegalStateException();
       }
 
-      // Create dictionary
-      try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(fieldSpec, _indexDir,
-          indexCreationInfo.isUseVarLengthDictionary())) {
-        
dictionaryCreator.build(indexCreationInfo.getSortedUniqueElementsArray());
-
-        // Create forward index
-        int cardinality = indexCreationInfo.getDistinctValueCount();
-        if (isSingleValue) {
-          try (ForwardIndexCreator forwardIndexCreator = 
indexCreationInfo.isSorted()
-              ? new SingleValueSortedForwardIndexCreator(_indexDir, column, 
cardinality)
-              : new SingleValueUnsortedForwardIndexCreator(_indexDir, column, 
cardinality, numDocs)) {
-            for (int i = 0; i < numDocs; i++) {
-              
forwardIndexCreator.putDictId(dictionaryCreator.indexOfSV(outputValues[i]));
-            }
+      if (createDictionary) {
+        createDerivedColumnForwardIndexWithDictionary(column, fieldSpec, 
outputValues, indexCreationInfo);
+      } else {
+        createDerivedColumnForwardIndexWithoutDictionary(column, fieldSpec, 
outputValues, indexCreationInfo);
+      }
+    } finally {
+      for (ValueReader valueReader : valueReaders) {
+        valueReader.close();
+      }
+    }
+  }
+
+  /**
+   * Helper method to create the dictionary and forward indices for a column 
with derived values.
+   */
+  private void createDerivedColumnForwardIndexWithDictionary(String column, 
FieldSpec fieldSpec, Object[] outputValues,
+      ColumnIndexCreationInfo indexCreationInfo) throws Exception {
+
+    // Create dictionary
+    try (SegmentDictionaryCreator dictionaryCreator = new 
SegmentDictionaryCreator(fieldSpec, _indexDir,
+        indexCreationInfo.isUseVarLengthDictionary())) {
+      
dictionaryCreator.build(indexCreationInfo.getSortedUniqueElementsArray());
+
+      // Create forward index
+      int cardinality = indexCreationInfo.getDistinctValueCount();
+      int numDocs = outputValues.length;
+      boolean isSingleValue = fieldSpec.isSingleValueField();
+      if (isSingleValue) {
+        try (ForwardIndexCreator forwardIndexCreator = 
indexCreationInfo.isSorted()
+            ? new SingleValueSortedForwardIndexCreator(_indexDir, column, 
cardinality)
+            : new SingleValueUnsortedForwardIndexCreator(_indexDir, column, 
cardinality, numDocs)) {
+          for (Object outputValue : outputValues) {
+            
forwardIndexCreator.putDictId(dictionaryCreator.indexOfSV(outputValue));
           }
-        } else {
-          DictIdCompressionType dictIdCompressionType = null;
-          FieldIndexConfigs fieldIndexConfig = 
_indexLoadingConfig.getFieldIndexConfig(column);
-          if (fieldIndexConfig != null) {
-            ForwardIndexConfig forwardIndexConfig = 
fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType());
-            if (forwardIndexConfig != null) {
-              dictIdCompressionType = 
forwardIndexConfig.getDictIdCompressionType();
-            }
+        }
+      } else {
+        DictIdCompressionType dictIdCompressionType = null;
+        FieldIndexConfigs fieldIndexConfig = 
_indexLoadingConfig.getFieldIndexConfig(column);
+        if (fieldIndexConfig != null) {
+          ForwardIndexConfig forwardIndexConfig = 
fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType());
+          if (forwardIndexConfig != null) {
+            dictIdCompressionType = 
forwardIndexConfig.getDictIdCompressionType();
           }
-          try (ForwardIndexCreator forwardIndexCreator = dictIdCompressionType 
== DictIdCompressionType.MV_ENTRY_DICT
-              ? new MultiValueEntryDictForwardIndexCreator(_indexDir, column, 
cardinality, numDocs)
-              : new MultiValueUnsortedForwardIndexCreator(_indexDir, column, 
cardinality, numDocs,
-                  indexCreationInfo.getTotalNumberOfEntries())) {
-            for (int i = 0; i < numDocs; i++) {
-              
forwardIndexCreator.putDictIdMV(dictionaryCreator.indexOfMV(outputValues[i]));
-            }
+        }
+        try (ForwardIndexCreator forwardIndexCreator = dictIdCompressionType 
== DictIdCompressionType.MV_ENTRY_DICT
+            ? new MultiValueEntryDictForwardIndexCreator(_indexDir, column, 
cardinality, numDocs)
+            : new MultiValueUnsortedForwardIndexCreator(_indexDir, column, 
cardinality, numDocs,
+                indexCreationInfo.getTotalNumberOfEntries())) {
+          for (Object outputValue : outputValues) {
+            
forwardIndexCreator.putDictIdMV(dictionaryCreator.indexOfMV(outputValue));
           }
         }
+      }
+
+      // Add the column metadata
+      SegmentColumnarIndexCreator.addColumnMetadataInfo(_segmentProperties, 
column, indexCreationInfo, numDocs,
+          fieldSpec, true, dictionaryCreator.getNumBytesPerEntry());
+    }
+  }
+
+  /**
+   * Helper method to create a forward index for a raw encoded column with 
derived values.
+   */
+  private void createDerivedColumnForwardIndexWithoutDictionary(String column, 
FieldSpec fieldSpec,
+      Object[] outputValues, ColumnIndexCreationInfo indexCreationInfo)
+      throws Exception {
 
-        // Add the column metadata
-        SegmentColumnarIndexCreator.addColumnMetadataInfo(_segmentProperties, 
column, indexCreationInfo, numDocs,
-            fieldSpec, true, dictionaryCreator.getNumBytesPerEntry());
+    // Create forward index
+    int numDocs = outputValues.length;
+    boolean isSingleValue = fieldSpec.isSingleValueField();
+    ChunkCompressionType chunkCompressionType = null;
+    int rawIndexWriterVersion = ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION;
+    boolean deriveNumDocsPerChunk = false;
+    int targetMaxChunkSizeBytes = 
ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES;
+    int targetDocsPerChunk = ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK;
+
+    FieldIndexConfigs fieldIndexConfig = 
_indexLoadingConfig.getFieldIndexConfig(column);
+    if (fieldIndexConfig != null) {
+      ForwardIndexConfig forwardIndexConfig = fieldIndexConfig.getConfig(new 
ForwardIndexPlugin().getIndexType());
+      if (forwardIndexConfig != null) {
+        chunkCompressionType = forwardIndexConfig.getChunkCompressionType();
+        rawIndexWriterVersion = forwardIndexConfig.getRawIndexWriterVersion();
+        deriveNumDocsPerChunk = forwardIndexConfig.isDeriveNumDocsPerChunk();
+        targetMaxChunkSizeBytes = 
forwardIndexConfig.getTargetMaxChunkSizeBytes();
+        targetDocsPerChunk = forwardIndexConfig.getTargetDocsPerChunk();
       }
-    } finally {
-      for (ValueReader valueReader : valueReaders) {
-        valueReader.close();
+    }
+    if (chunkCompressionType == null) {
+      chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
+    }
+
+    if (isSingleValue) {
+      try (ForwardIndexCreator forwardIndexCreator = 
fieldSpec.getDataType().getStoredType().isFixedWidth()
+          ? new SingleValueFixedByteRawIndexCreator(_indexDir, 
chunkCompressionType, column, numDocs,
+              fieldSpec.getDataType().getStoredType(), rawIndexWriterVersion, 
targetDocsPerChunk)
+          : new SingleValueVarByteRawIndexCreator(_indexDir, 
chunkCompressionType, column, numDocs,
+              fieldSpec.getDataType().getStoredType(), 
indexCreationInfo.getMaxRowLengthInBytes(),
+              deriveNumDocsPerChunk, rawIndexWriterVersion, 
targetMaxChunkSizeBytes, targetDocsPerChunk)) {
+        for (Object outputValue : outputValues) {
+          switch (fieldSpec.getDataType().getStoredType()) {
+            // Casts are safe here because we've already done the conversion 
in createDerivedColumnV1Indices
+            case INT:
+              forwardIndexCreator.putInt((int) outputValue);
+              break;
+            case LONG:
+              forwardIndexCreator.putLong((long) outputValue);
+              break;
+            case FLOAT:
+              forwardIndexCreator.putFloat((float) outputValue);
+              break;
+            case DOUBLE:
+              forwardIndexCreator.putDouble((double) outputValue);
+              break;
+            case BIG_DECIMAL:
+              forwardIndexCreator.putBigDecimal((BigDecimal) outputValue);
+              break;
+            case STRING:
+              forwardIndexCreator.putString((String) outputValue);
+              break;
+            case BYTES:
+              forwardIndexCreator.putBytes((byte[]) outputValue);
+              break;
+            default:
+              throw new IllegalStateException();
+          }
+        }
+      }
+    } else {
+      try (ForwardIndexCreator forwardIndexCreator = 
fieldSpec.getDataType().getStoredType().isFixedWidth()
+          ? new MultiValueFixedByteRawIndexCreator(_indexDir, 
chunkCompressionType, column, numDocs,
+              fieldSpec.getDataType().getStoredType(), 
indexCreationInfo.getMaxNumberOfMultiValueElements(),
+              deriveNumDocsPerChunk, rawIndexWriterVersion, 
targetMaxChunkSizeBytes, targetDocsPerChunk)
+          : new MultiValueVarByteRawIndexCreator(_indexDir, 
chunkCompressionType, column, numDocs,
+              fieldSpec.getDataType().getStoredType(), rawIndexWriterVersion,
+              indexCreationInfo.getMaxRowLengthInBytes(), 
indexCreationInfo.getMaxNumberOfMultiValueElements(),
+              targetMaxChunkSizeBytes, targetDocsPerChunk)) {

Review Comment:
   This is starting to look fairly messy, I'm wondering if we should refactor 
to use 
[ForwardIndexCreatorFactory](https://github.com/apache/pinot/blob/e07b576797e2dc834a60d8ee0da5d26a0b597e23/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java#L47)
 instead?
   
   Edit: It does look much cleaner with the factory, also it abstracts away the 
default value handling for all the properties. I've refactored this as well as 
the dictionary case to use the `ForwardIndexCreatorFactory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to