mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325807516
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 ##########
 @@ -392,6 +434,218 @@ private void setAggregationResults(@Nonnull 
BrokerResponseNative brokerResponseN
     brokerResponseNative.setAggregationResults(reducedAggregationResults);
   }
 
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTableMap map from server to data table
+   */
+  private void setSQLGroupByOrderByResults(@Nonnull BrokerResponseNative 
brokerResponseNative,
+      @Nonnull DataSchema dataSchema, @Nonnull List<AggregationInfo> 
aggregationInfos, @Nonnull GroupBy groupBy,
+      @Nonnull List<SelectionSort> orderBy, @Nonnull Map<ServerInstance, 
DataTable> dataTableMap,
+      boolean preserveType) {
+
+    List<String> columns = new ArrayList<>(dataSchema.size());
+    for (int i = 0; i < dataSchema.size(); i++) {
+      columns.add(dataSchema.getColumnName(i));
+    }
+
+    int numGroupBy = groupBy.getExpressionsSize();
+    int numAggregations = aggregationInfos.size();
+
+    IndexedTable indexedTable;
+    try {
+      indexedTable =
+          getIndexedTable(numGroupBy, numAggregations, groupBy, 
aggregationInfos, orderBy, dataSchema, dataTableMap);
+    } catch (Throwable throwable) {
+      throw new IllegalStateException(throwable);
+    }
+
+    List<AggregationFunction> aggregationFunctions = new 
ArrayList<>(aggregationInfos.size());
+    for (AggregationInfo aggregationInfo : aggregationInfos) {
+      aggregationFunctions.add(
+          
AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+    }
+
+    List<Serializable[]> rows = new ArrayList<>();
+    int numColumns = columns.size();
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    int numRows = 0;
+    while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+      Record nextRecord = sortedIterator.next();
+      Serializable[] row = new Serializable[numColumns];
+      int index = 0;
+      for (Object keyColumn : nextRecord.getKey().getColumns()) {
+        row[index ++] = getSerializableValue(keyColumn);
+      }
+      int aggNum = 0;
+      for (Object valueColumn : nextRecord.getValues()) {
+        row[index] = 
getSerializableValue(aggregationFunctions.get(aggNum).extractFinalResult(valueColumn));
+        if (preserveType) {
+          row[index] = AggregationFunctionUtils.formatValue(row[index]);
+        }
+        index ++;
+      }
+      rows.add(row);
+      numRows++;
+    }
+
+    brokerResponseNative.setResultTable(new ResultTable(columns, rows));
+  }
+
+  private IndexedTable getIndexedTable(int numGroupBy, int numAggregations, 
GroupBy groupBy,
+      List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, 
DataSchema dataSchema, Map<ServerInstance, DataTable> dataTableMap)
+      throws Throwable {
+
+    IndexedTable indexedTable = new ConcurrentIndexedTable();
+    // setting a higher value to avoid frequent resizing
+    int capacity = (int) Math.max(groupBy.getTopN(), 10000);
+    indexedTable.init(dataSchema, aggregationInfos, orderBy, capacity, true);
+
+    for (DataTable dataTable : dataTableMap.values()) {
+      CheckedFunction2[] functions = new CheckedFunction2[dataSchema.size()];
+      for (int i = 0; i < dataSchema.size(); i++) {
+        ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+        CheckedFunction2<Integer, Integer, Object> function;
+        switch (columnDataType) {
+
+          case INT:
+            function = (CheckedFunction2<Integer, Integer, Object>) 
dataTable::getInt;
+            break;
+          case LONG:
+            function = (CheckedFunction2<Integer, Integer, Object>) 
dataTable::getLong;
+            break;
+          case FLOAT:
+            function = (CheckedFunction2<Integer, Integer, Object>) 
dataTable::getFloat;
+            break;
+          case DOUBLE:
+            function = (CheckedFunction2<Integer, Integer, Object>) 
dataTable::getDouble;
+            break;
+          case STRING:
+            function = (CheckedFunction2<Integer, Integer, Object>) 
dataTable::getString;
+            break;
+          default:
+            function = (CheckedFunction2<Integer, Integer, Object>) 
dataTable::getObject;
+        }
+        functions[i] = function;
+      }
+
+      for (int row = 0; row < dataTable.getNumberOfRows(); row++) {
+        Object[] key = new Object[numGroupBy];
+        int col = 0;
+        for (int j = 0; j < numGroupBy; j++) {
+          key[j] = functions[col].apply(row, col);
+          col ++;
+        }
+        Object[] value = new Object[numAggregations];
+        for (int j = 0; j < numAggregations; j++) {
+          value[j] = functions[col].apply(row, col);
+          col ++;
+        }
+        Record record = new Record(new Key(key), value);
+        indexedTable.upsert(record);
+      }
+    }
+    indexedTable.finish();
+    return indexedTable;
+  }
+
+  /**
+   * Extract the results of group by order by into a List of {@link 
AggregationResult}
+   * There will be 1 aggregation result per aggregation. The group by keys 
will be the same across all aggregations
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTableMap map from server to data table
+   */
+  private void setPQLGroupByOrderByResults(@Nonnull BrokerResponseNative 
brokerResponseNative,
 
 Review comment:
   Is this needed because we want to support order-by with PQL?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to