gortiz commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1633033128


##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java:
##########
@@ -187,200 +141,315 @@ public static RowDataBlock buildFromRows(List<Object[]> 
rows, DataSchema dataSch
                     dataSchema.getColumnName(colId)));
         }
       }
-      rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
+
+    CompoundDataBuffer.Builder varBufferBuilder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true)
+        .addPagedOutputStream(varSize);
+
     // Write null bitmaps after writing data.
-    for (RoaringBitmap nullBitmap : nullBitmaps) {
-      rowBuilder.setNullRowIds(nullBitmap);
-    }
-    return buildRowBlock(rowBuilder);
+    setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+    return buildRowBlock(numRows, dataSchema, 
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
   }
 
   public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
DataSchema dataSchema)
       throws IOException {
     int numRows = columns.isEmpty() ? 0 : columns.get(0).length;
-    DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, 
DataBlock.Type.COLUMNAR, numRows);
+
+    int fixedBytesPerRow = calculateBytesPerRow(dataSchema);
+    int nullFixedBytes = dataSchema.size() * Integer.BYTES * 2;
+    int fixedBytesRequired = fixedBytesPerRow * numRows + nullFixedBytes;
+
+    Object2IntOpenHashMap<String> dictionary = new Object2IntOpenHashMap<>();
+
     // TODO: consolidate these null utils into data table utils.
     // Selection / Agg / Distinct all have similar code.
-    ColumnDataType[] storedTypes = dataSchema.getStoredColumnDataTypes();
-    int numColumns = storedTypes.length;
+    int numColumns = dataSchema.size();
+
     RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
-    Object[] nullPlaceholders = new Object[numColumns];
-    for (int colId = 0; colId < numColumns; colId++) {
-      nullBitmaps[colId] = new RoaringBitmap();
-      nullPlaceholders[colId] = storedTypes[colId].getNullPlaceholder();
+    ByteBuffer fixedSize = ByteBuffer.allocate(fixedBytesRequired);
+    CompoundDataBuffer.Builder varBufferBuilder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true);
+
+    try (PagedPinotOutputStream varSize = PagedPinotOutputStream.createHeap()) 
{
+      for (int colId = 0; colId < numColumns; colId++) {
+        RoaringBitmap nullBitmap = new RoaringBitmap();
+        nullBitmaps[colId] = nullBitmap;
+        serializeColumnData(columns, dataSchema, colId, fixedSize, varSize, 
nullBitmap, dictionary);
+      }
+      varBufferBuilder.addPagedOutputStream(varSize);
     }
-    for (int colId = 0; colId < numColumns; colId++) {
-      Object[] column = columns.get(colId);
-      ByteBuffer byteBuffer = ByteBuffer.allocate(numRows * 
columnarBuilder._columnSizeInBytes[colId]);
-      Object value;
-
-      // NOTE:
-      // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
-      // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
-      switch (storedTypes[colId]) {
-        // Single-value column
-        case INT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putInt((int) value);
+    // Write null bitmaps after writing data.
+    setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+    return buildColumnarBlock(numRows, dataSchema, 
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
+  }
+
+  private static void serializeColumnData(List<Object[]> columns, DataSchema 
dataSchema, int colId,
+      ByteBuffer fixedSize, PagedPinotOutputStream varSize, RoaringBitmap 
nullBitmap,
+      Object2IntOpenHashMap<String> dictionary)
+      throws IOException {
+    ColumnDataType storedType = 
dataSchema.getColumnDataType(colId).getStoredType();
+    int numRows = columns.get(colId).length;
+
+    Object[] column = columns.get(colId);
+
+    // NOTE:
+    // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
+    // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
+    switch (storedType) {
+      // Single-value column
+      case INT: {
+        int nullPlaceholder = (int) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putInt(nullPlaceholder);
+          } else {
+            fixedSize.putInt((int) value);
           }
-          break;
-        case LONG:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putLong((long) value);
+        }
+        break;
+      }
+      case LONG: {
+        long nullPlaceholder = (long) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putLong(nullPlaceholder);
+          } else {
+            fixedSize.putLong((long) value);
           }
-          break;
-        case FLOAT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putFloat((float) value);
+        }
+        break;
+      }
+      case FLOAT: {
+        float nullPlaceholder = (float) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putFloat(nullPlaceholder);
+          } else {
+            fixedSize.putFloat((float) value);
           }
-          break;
-        case DOUBLE:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putDouble((double) value);
+        }
+        break;
+      }
+      case DOUBLE: {
+        double nullPlaceholder = (double) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putDouble(nullPlaceholder);
+          } else {
+            fixedSize.putDouble((double) value);
           }
-          break;
-        case BIG_DECIMAL:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
+        }
+        break;
+      }
+      case BIG_DECIMAL: {
+        BigDecimal nullPlaceholder = (BigDecimal) 
storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (BigDecimal) value);
           }
-          break;
-        case STRING:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (String) value);
+        }
+        break;
+      }
+      case STRING: {
+        ToIntFunction<String> didSupplier = k -> dictionary.size();
+        int nullPlaceHolder = dictionary.computeIfAbsent((String) 
storedType.getNullPlaceholder(), didSupplier);
+
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putInt(nullPlaceHolder);
+          } else {
+            int dictId = dictionary.computeIfAbsent((String) value, 
didSupplier);
+            fixedSize.putInt(dictId);
           }
-          break;
-        case BYTES:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
+        }
+        break;
+      }
+      case BYTES: {
+        ByteArray nullPlaceholder = (ByteArray) 
storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (ByteArray) value);
           }
-          break;
+        }
+        break;
+      }
 
-        // Multi-value column
-        case INT_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (int[]) value);
+      // Multi-value column
+      case INT_ARRAY: {
+        int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (int[]) value);
           }
-          break;
-        case LONG_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (long[]) value);
+        }
+        break;
+      }
+      case LONG_ARRAY: {
+        long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (long[]) value);
           }
-          break;
-        case FLOAT_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (float[]) value);
+        }
+        break;
+      }
+      case FLOAT_ARRAY: {
+        float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (float[]) value);
           }
-          break;
-        case DOUBLE_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (double[]) value);
+        }
+        break;
+      }
+      case DOUBLE_ARRAY: {
+        double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (double[]) value);
           }
-          break;
-        case STRING_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (String[]) value);
+        }
+        break;
+      }
+      case STRING_ARRAY: {
+        String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
+          } else {
+            setColumn(fixedSize, varSize, (String[]) value, dictionary);
           }
-          break;
+        }
+        break;
+      }
 
-        // Special intermediate result for aggregation function
-        case OBJECT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            setColumn(columnarBuilder, byteBuffer, column[rowId]);
-          }
-          break;
+      // Special intermediate result for aggregation function
+      case OBJECT: {
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          setColumn(fixedSize, varSize, column[rowId]);
+        }
+        break;
+      }
+      // Null
+      case UNKNOWN:
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          setColumn(fixedSize, varSize, (Object) null);
+        }
+        break;
 
-        // Null
-        case UNKNOWN:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            setColumn(columnarBuilder, byteBuffer, (Object) null);
-          }
-          break;
+      default:
+        throw new IllegalStateException(
+            String.format("Unsupported stored type: %s for column: %s", 
storedType,
+                dataSchema.getColumnName(colId)));
+    }
+  }
 
+  private static int calculateBytesPerRow(DataSchema dataSchema) {
+    int rowSizeInBytes = 0;
+    for (ColumnDataType columnDataType : dataSchema.getColumnDataTypes()) {
+      switch (columnDataType) {
+        case INT:
+          rowSizeInBytes += 4;
+          break;
+        case LONG:
+          rowSizeInBytes += 8;
+          break;
+        case FLOAT:
+          rowSizeInBytes += 4;
+          break;
+        case DOUBLE:
+          rowSizeInBytes += 8;
+          break;
+        case STRING:
+          rowSizeInBytes += 4;
+          break;
+        // Object and array. (POSITION|LENGTH)
         default:
-          throw new IllegalStateException(
-              String.format("Unsupported stored type: %s for column: %s", 
storedTypes[colId],
-                  dataSchema.getColumnName(colId)));
+          rowSizeInBytes += 8;
+          break;
       }
-      
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());

Review Comment:
   Here again we were copying bytes. In this case the allocation is smaller, 
but problematic anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to