Jackie-Jiang commented on a change in pull request #6044:
URL: https://github.com/apache/incubator-pinot/pull/6044#discussion_r502726246



##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -102,7 +102,7 @@
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
   protected final BrokerRequestOptimizer _brokerRequestOptimizer = new 
BrokerRequestOptimizer();
-  protected final BrokerReduceService _brokerReduceService = new 
BrokerReduceService();
+  protected BrokerReduceService _brokerReduceService;

Review comment:
       This can still be final?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -166,6 +166,11 @@
     public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 
100.0;
     public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = 
"pinot.broker.enable.query.limit.override";
 
+    // Config for number of threads to use for Broker reduce-phase.
+    public static final String CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY = 
"pinot.broker.max.reduce.threads";

Review comment:
       `pinot.broker.max.reduce.threads.per.query` for clarity?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
##########
@@ -231,58 +239,130 @@ private DataSchema 
getPrePostAggregationDataSchema(DataSchema dataSchema) {
     return new DataSchema(columnNames, columnDataTypes);
   }
 
-  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTables) {
+  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext) {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+
+    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
     int capacity = GroupByUtils.getTableCapacity(_queryContext);
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, capacity);
+    IndexedTable indexedTable =
+        (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema, 
_queryContext, capacity)
+            : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+
+    Future[] futures = new Future[numDataTables];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    int cnt = 0;
     ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    for (DataTable dataTable : dataTables) {
-      int numRows = dataTable.getNumberOfRows();
-      for (int rowId = 0; rowId < numRows; rowId++) {
-        Object[] values = new Object[_numColumns];
-        for (int colId = 0; colId < _numColumns; colId++) {
-          switch (columnDataTypes[colId]) {
-            case INT:
-              values[colId] = dataTable.getInt(rowId, colId);
-              break;
-            case LONG:
-              values[colId] = dataTable.getLong(rowId, colId);
-              break;
-            case FLOAT:
-              values[colId] = dataTable.getFloat(rowId, colId);
-              break;
-            case DOUBLE:
-              values[colId] = dataTable.getDouble(rowId, colId);
-              break;
-            case STRING:
-              values[colId] = dataTable.getString(rowId, colId);
-              break;
-            case BYTES:
-              values[colId] = dataTable.getBytes(rowId, colId);
-              break;
-            case OBJECT:
-              values[colId] = dataTable.getObject(rowId, colId);
-              break;
-            // Add other aggregation intermediate result / group-by column 
type supports here
-            default:
-              throw new IllegalStateException();
+    for (List<DataTable> reduceGroup : reduceGroups) {
+      futures[cnt++] = reducerContext.getExecutorService().submit(new 
TraceRunnable() {
+        @Override
+        public void runJob() {
+          for (DataTable dataTable : reduceGroup) {
+            int numRows = dataTable.getNumberOfRows();
+
+            try {
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                Object[] values = new Object[_numColumns];
+                for (int colId = 0; colId < _numColumns; colId++) {
+                  switch (columnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      values[colId] = dataTable.getObject(rowId, colId);
+                      break;
+                    // Add other aggregation intermediate result / group-by 
column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
+                }
+                indexedTable.upsert(new Record(values));
+              }
+            } finally {
+              countDownLatch.countDown();
+            }
           }
         }
-        indexedTable.upsert(new Record(values));
+      });
+    }
+
+    try {
+      long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
+      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
       }
     }
+
     indexedTable.finish(true);
     return indexedTable;
   }
 
+  /**
+   * Computes the number of reduce threads to use per query.
+   * <ul>
+   *   <li> Use single thread if number of data tables to reduce is less than 
{@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+   *   <li> Else, use min of max allowed reduce threads per query, and number 
of data tables.</li>
+   * </ul>
+   *
+   * @param numDataTables Number of data tables to reduce
+   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+   * @return Number of reduce threads to use for the query
+   */
+  private int getNumReduceThreadsToUse(int numDataTables, int 
maxReduceThreadsPerQuery) {
+    // Use single thread if number of data tables < 
MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+      return Math.min(1, numDataTables); // Number of data tables can be zero.

Review comment:
       Return 1? You need at least one thread

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
##########
@@ -231,58 +239,130 @@ private DataSchema 
getPrePostAggregationDataSchema(DataSchema dataSchema) {
     return new DataSchema(columnNames, columnDataTypes);
   }
 
-  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTables) {
+  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext) {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+
+    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
     int capacity = GroupByUtils.getTableCapacity(_queryContext);
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, capacity);
+    IndexedTable indexedTable =
+        (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema, 
_queryContext, capacity)
+            : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+
+    Future[] futures = new Future[numDataTables];

Review comment:
       Don't use the executor service for single-threaded case. There is 
overhead of using that instead of the current thread, which might cause 
performance degradation.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
##########
@@ -47,8 +57,32 @@
 @ThreadSafe
 public class BrokerReduceService {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerReduceService.class);
+
+  // brw -> Shorthand for broker reduce worker threads.
+  private static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
+  protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;

Review comment:
       Any specific reason for this priority? Some comments will be appreciated

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -166,6 +166,11 @@
     public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 
100.0;
     public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = 
"pinot.broker.enable.query.limit.override";
 
+    // Config for number of threads to use for Broker reduce-phase.
+    public static final String CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY = 
"pinot.broker.max.reduce.threads";
+    public static final int MAX_REDUCE_THREADS_PER_QUERY =

Review comment:
       ```suggestion
       public static final int DEFAULT_MAX_REDUCE_THREADS_PER_QUERY =
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
##########
@@ -47,8 +57,32 @@
 @ThreadSafe
 public class BrokerReduceService {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerReduceService.class);
+
+  // brw -> Shorthand for broker reduce worker threads.
+  private static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
+  protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;
+
+  private final ListeningExecutorService _reduceExecutorService;
+  private final int _maxReduceThreadsPerQuery;
+
+  public BrokerReduceService(PinotConfiguration config) {
+    _maxReduceThreadsPerQuery = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
+        CommonConstants.Broker.MAX_REDUCE_THREADS_PER_QUERY);
+    LOGGER.info("Initializing BrokerReduceService with {} reduce threads.", 
_maxReduceThreadsPerQuery);

Review comment:
       Log both number or worker threads and threads per query?
   Also, if it is single-threaded, no need to launch the executor service.

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
##########
@@ -319,13 +321,13 @@ private void testDistinctInnerSegmentHelper(String[] 
queries, boolean isPql)
   @Test
   public void testDistinctInnerSegment()
       throws Exception {
-    testDistinctInnerSegmentHelper(new String[]{
-        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn) FROM testTable LIMIT 10000",
-        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM 
testTable WHERE intColumn >= 60 LIMIT 10000",
-        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY 
bytesColumn LIMIT 5",
-        "SELECT DISTINCT(ADD ( intColumn,  floatColumn  ), stringColumn) FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10",
-        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10"
-    }, true);
+    testDistinctInnerSegmentHelper(

Review comment:
       I feel the original formatting is better. You can skip the reformatting 
by adding `//@formatter:off`, see 
`AggregationFunctionUtils.isFitForDictionaryBasedComputation()` for details.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
##########
@@ -47,8 +57,32 @@
 @ThreadSafe
 public class BrokerReduceService {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerReduceService.class);
+
+  // brw -> Shorthand for broker reduce worker threads.
+  private static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
+  protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;
+
+  private final ListeningExecutorService _reduceExecutorService;
+  private final int _maxReduceThreadsPerQuery;
+
+  public BrokerReduceService(PinotConfiguration config) {
+    _maxReduceThreadsPerQuery = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
+        CommonConstants.Broker.MAX_REDUCE_THREADS_PER_QUERY);
+    LOGGER.info("Initializing BrokerReduceService with {} reduce threads.", 
_maxReduceThreadsPerQuery);
+
+    ThreadFactory reduceThreadFactory =
+        new 
ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY)
+            .setNameFormat(REDUCE_THREAD_NAME_FORMAT).build();
+
+    // ExecutorService is initialized with numThreads sames 
availableProcessors.
+    ExecutorService delegate =
+        
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), 
reduceThreadFactory);
+    _reduceExecutorService = MoreExecutors.listeningDecorator(delegate);

Review comment:
       I don't think we need to use the `ListeningExecutorService` here, 
`ExecutorService` should be enough with lower overhead

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
##########
@@ -231,58 +239,130 @@ private DataSchema 
getPrePostAggregationDataSchema(DataSchema dataSchema) {
     return new DataSchema(columnNames, columnDataTypes);
   }
 
-  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTables) {
+  private IndexedTable getIndexedTable(DataSchema dataSchema, 
Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext) {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, 
reducerContext.getMaxReduceThreadsPerQuery());
+
+    // In case of single reduce thread, fall back to SimpleIndexedTable to 
avoid redundant locking/unlocking calls.
     int capacity = GroupByUtils.getTableCapacity(_queryContext);
-    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, capacity);
+    IndexedTable indexedTable =
+        (numReduceThreadsToUse > 1) ? new ConcurrentIndexedTable(dataSchema, 
_queryContext, capacity)
+            : new SimpleIndexedTable(dataSchema, _queryContext, capacity);
+
+    Future[] futures = new Future[numDataTables];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have 
at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new 
ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    int cnt = 0;
     ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    for (DataTable dataTable : dataTables) {
-      int numRows = dataTable.getNumberOfRows();
-      for (int rowId = 0; rowId < numRows; rowId++) {
-        Object[] values = new Object[_numColumns];
-        for (int colId = 0; colId < _numColumns; colId++) {
-          switch (columnDataTypes[colId]) {
-            case INT:
-              values[colId] = dataTable.getInt(rowId, colId);
-              break;
-            case LONG:
-              values[colId] = dataTable.getLong(rowId, colId);
-              break;
-            case FLOAT:
-              values[colId] = dataTable.getFloat(rowId, colId);
-              break;
-            case DOUBLE:
-              values[colId] = dataTable.getDouble(rowId, colId);
-              break;
-            case STRING:
-              values[colId] = dataTable.getString(rowId, colId);
-              break;
-            case BYTES:
-              values[colId] = dataTable.getBytes(rowId, colId);
-              break;
-            case OBJECT:
-              values[colId] = dataTable.getObject(rowId, colId);
-              break;
-            // Add other aggregation intermediate result / group-by column 
type supports here
-            default:
-              throw new IllegalStateException();
+    for (List<DataTable> reduceGroup : reduceGroups) {
+      futures[cnt++] = reducerContext.getExecutorService().submit(new 
TraceRunnable() {
+        @Override
+        public void runJob() {
+          for (DataTable dataTable : reduceGroup) {
+            int numRows = dataTable.getNumberOfRows();
+
+            try {
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                Object[] values = new Object[_numColumns];
+                for (int colId = 0; colId < _numColumns; colId++) {
+                  switch (columnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      values[colId] = dataTable.getObject(rowId, colId);
+                      break;
+                    // Add other aggregation intermediate result / group-by 
column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
+                }
+                indexedTable.upsert(new Record(values));
+              }
+            } finally {
+              countDownLatch.countDown();
+            }
           }
         }
-        indexedTable.upsert(new Record(values));
+      });
+    }
+
+    try {
+      long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
+      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {

Review comment:
       (Critical) You need to put the timeout exception into the query 
response, or the response will be wrong and there is no way to detect that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to