This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f5f072c  Optimize IndexedTable (#7373)
f5f072c is described below

commit f5f072ceeb151e2694f77987f56b88334d6fe79a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Aug 30 13:55:02 2021 -0700

    Optimize IndexedTable (#7373)
    
    - When resizing the records map, do not recreate the map but clear it to 
avoid growing the map again. This can also reduce the GC required
    - When calculating the final results on server side, do not recreate the 
map but directly create a list to return the results
    - Move the common logic to the base class `IndexedTable`
---
 .../core/data/table/ConcurrentIndexedTable.java    |  94 ++--------------
 .../apache/pinot/core/data/table/IndexedTable.java |  56 +++++++++-
 .../pinot/core/data/table/IntermediateRecord.java  |  19 ++--
 .../pinot/core/data/table/SimpleIndexedTable.java  |  82 ++------------
 .../apache/pinot/core/data/table/TableResizer.java | 123 ++++++++++-----------
 .../table/UnboundedConcurrentIndexedTable.java     |   8 +-
 .../combine/GroupByOrderByCombineOperator.java     |   7 +-
 .../pinot/core/data/table/TableResizerTest.java    |  36 +++---
 8 files changed, 163 insertions(+), 262 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 4e80e09..3e54e92 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,37 +19,23 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
-import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Thread safe {@link Table} implementation for aggregating Records based on 
combination of keys
  */
+@SuppressWarnings("unchecked")
 public class ConcurrentIndexedTable extends IndexedTable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
-
-  protected volatile ConcurrentMap<Key, Record> _lookupMap;
-  protected final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
-  private Iterator<Record> _iterator;
-  private final ReentrantReadWriteLock _readWriteLock;
-  private final AtomicInteger _numResizes = new AtomicInteger();
-  private final AtomicLong _resizeTimeMs = new AtomicLong();
+  private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+  private final ReentrantReadWriteLock _readWriteLock = new 
ReentrantReadWriteLock();
 
   public ConcurrentIndexedTable(DataSchema dataSchema, QueryContext 
queryContext, int trimSize, int trimThreshold) {
-    super(dataSchema, queryContext, trimSize, trimThreshold);
-    _lookupMap = new ConcurrentHashMap<>();
-    _readWriteLock = new ReentrantReadWriteLock();
+    super(dataSchema, queryContext, trimSize, trimThreshold, new 
ConcurrentHashMap<>());
   }
 
   /**
@@ -58,7 +44,8 @@ public class ConcurrentIndexedTable extends IndexedTable {
   @Override
   public boolean upsert(Key key, Record newRecord) {
     Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
-    if (_noMoreNewRecords.get()) { // allow only existing record updates
+    if (_noMoreNewRecords.get()) {
+      // allow only existing record updates
       _lookupMap.computeIfPresent(key, (k, v) -> {
         Object[] existingValues = v.getValues();
         Object[] newValues = newRecord.getValues();
@@ -68,8 +55,8 @@ public class ConcurrentIndexedTable extends IndexedTable {
         }
         return v;
       });
-    } else { // allow all records
-
+    } else {
+      // allow all records
       _readWriteLock.readLock().lock();
       try {
         _lookupMap.compute(key, (k, v) -> {
@@ -96,7 +83,7 @@ public class ConcurrentIndexedTable extends IndexedTable {
           _readWriteLock.writeLock().lock();
           try {
             if (_lookupMap.size() >= _trimThreshold) {
-              resize(_trimSize);
+              resize();
             }
           } finally {
             _readWriteLock.writeLock().unlock();
@@ -109,67 +96,4 @@ public class ConcurrentIndexedTable extends IndexedTable {
     }
     return true;
   }
-
-  @Override
-  public int size() {
-    return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
-  }
-
-  @Override
-  public Iterator<Record> iterator() {
-    return _iterator;
-  }
-
-  private void resize(int trimToSize) {
-    long startTime = System.currentTimeMillis();
-    // when the resizer trims using a PQ, it will return a new trimmed map.
-    // the reference held by the indexed table needs to be updated. this is 
also
-    // the reason why it is volatile since the thread doing the resize will 
result in
-    // a new reference
-    _lookupMap = (ConcurrentMap) _tableResizer.resizeRecordsMap(_lookupMap, 
trimToSize);
-    long endTime = System.currentTimeMillis();
-    long timeElapsed = endTime - startTime;
-    _numResizes.incrementAndGet();
-    _resizeTimeMs.addAndGet(timeElapsed);
-  }
-
-  private List<Record> resizeAndSort(int trimToSize) {
-    long startTime = System.currentTimeMillis();
-    List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap, 
trimToSize);
-    long endTime = System.currentTimeMillis();
-    long timeElapsed = endTime - startTime;
-    _numResizes.incrementAndGet();
-    _resizeTimeMs.addAndGet(timeElapsed);
-    return sortedRecords;
-  }
-
-  @Override
-  public void finish(boolean sort) {
-    if (_hasOrderBy) {
-      if (sort) {
-        _sortedRecords = resizeAndSort(_trimSize);
-        _iterator = _sortedRecords.iterator();
-      } else {
-        resize(_trimSize);
-      }
-      int numResizes = _numResizes.get();
-      long resizeTime = _resizeTimeMs.get();
-      LOGGER.debug(
-          "Num resizes : {}, Total time spent in resizing : {}, Avg resize 
time : {}, trimSize: {}, trimThreshold: {}",
-          numResizes, resizeTime, numResizes == 0 ? 0 : resizeTime / 
numResizes, _trimSize, _trimThreshold);
-    }
-    if (_iterator == null) {
-      _iterator = _lookupMap.values().iterator();
-    }
-  }
-
-  @Override
-  public int getNumResizes() {
-    return _numResizes.get();
-  }
-
-  @Override
-  public long getResizeTimeMs() {
-    return _resizeTimeMs.get();
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index c145706..2ac82e7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -19,7 +19,11 @@
 package org.apache.pinot.core.data.table;
 
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
@@ -32,18 +36,24 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
  */
 @SuppressWarnings("rawtypes")
 public abstract class IndexedTable extends BaseTable {
+  protected final Map<Key, Record> _lookupMap;
   protected final int _numKeyColumns;
   protected final AggregationFunction[] _aggregationFunctions;
   protected final boolean _hasOrderBy;
   protected final TableResizer _tableResizer;
-  protected List<Record> _sortedRecords;
   // The size we need to trim to
   protected final int _trimSize;
   // The size with added buffer, in order to collect more records than 
capacity for better precision
   protected final int _trimThreshold;
 
-  protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int 
trimSize, int trimThreshold) {
+  protected Collection<Record> _topRecords;
+  private int _numResizes;
+  private long _resizeTimeNs;
+
+  protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int 
trimSize, int trimThreshold,
+      Map<Key, Record> lookupMap) {
     super(dataSchema);
+    _lookupMap = lookupMap;
     List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
     assert groupByExpressions != null;
     _numKeyColumns = groupByExpressions.size();
@@ -84,7 +94,45 @@ public abstract class IndexedTable extends BaseTable {
     return upsert(new Key(keyValues), record);
   }
 
-  public abstract int getNumResizes();
+  @Override
+  public int size() {
+    return _topRecords != null ? _topRecords.size() : _lookupMap.size();
+  }
+
+  /**
+   * Resizes the lookup map based on the trim size.
+   */
+  protected void resize() {
+    long startTimeNs = System.nanoTime();
+    _tableResizer.resizeRecordsMap(_lookupMap, _trimSize);
+    long resizeTimeNs = System.nanoTime() - startTimeNs;
+    _numResizes++;
+    _resizeTimeNs += resizeTimeNs;
+  }
+
+  @Override
+  public void finish(boolean sort) {
+    if (_hasOrderBy) {
+      long startTimeNs = System.nanoTime();
+      _topRecords = _tableResizer.getTopRecords(_lookupMap, _trimSize, sort);
+      long resizeTimeNs = System.nanoTime() - startTimeNs;
+      _numResizes++;
+      _resizeTimeNs += resizeTimeNs;
+    } else {
+      _topRecords = _lookupMap.values();
+    }
+  }
+
+  @Override
+  public Iterator<Record> iterator() {
+    return _topRecords.iterator();
+  }
+
+  public int getNumResizes() {
+    return _numResizes;
+  }
 
-  public abstract long getResizeTimeMs();
+  public long getResizeTimeMs() {
+    return TimeUnit.NANOSECONDS.toMillis(_resizeTimeNs);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
index 520c3e8..85288d4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
@@ -19,24 +19,19 @@
 package org.apache.pinot.core.data.table;
 
 /**
- * Helper class to store a subset of Record fields
- * IntermediateRecord is derived from a Record
- * Some of the main properties of an IntermediateRecord are:
- *
- * 1. Key in IntermediateRecord is expected to be identical to the one in the 
Record
- * 2. For values, IntermediateRecord should only have the columns needed for 
order by
- * 3. Inside the values, the columns should be ordered by the order by sequence
- * 4. For order by on aggregations, final results are extracted
- * 5. There is a mandatory field to store the original record to prevent from 
duplicate looking up
+ * Helper class to store the values to be ordered. It also wraps the Key and 
Record of the record.
+ * - When ordering on an aggregation, stores the final result of the 
aggregation
+ * - When ordering on a column/transform, stores the actual value of the 
expression
  */
+@SuppressWarnings("rawtypes")
 public class IntermediateRecord {
   public final Key _key;
-  public final Comparable[] _values;
   public final Record _record;
+  public final Comparable[] _values;
 
-  IntermediateRecord(Key key, Comparable[] values, Record record) {
+  IntermediateRecord(Key key, Record record, Comparable[] values) {
     _key = key;
-    _values = values;
     _record = record;
+    _values = values;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 41c63c8..d9ec976 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -20,33 +20,21 @@ package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * {@link Table} implementation for aggregating TableRecords based on 
combination of keys
  */
+@SuppressWarnings("unchecked")
 @NotThreadSafe
 public class SimpleIndexedTable extends IndexedTable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleIndexedTable.class);
-
-  private Map<Key, Record> _lookupMap;
-  private Iterator<Record> _iterator;
-
   private boolean _noMoreNewRecords = false;
-  private int _numResizes = 0;
-  private long _resizeTimeMs = 0;
 
   public SimpleIndexedTable(DataSchema dataSchema, QueryContext queryContext, 
int trimSize, int trimThreshold) {
-    super(dataSchema, queryContext, trimSize, trimThreshold);
-    _lookupMap = new HashMap<>();
+    super(dataSchema, queryContext, trimSize, trimThreshold, new HashMap<>());
   }
 
   /**
@@ -55,7 +43,8 @@ public class SimpleIndexedTable extends IndexedTable {
   @Override
   public boolean upsert(Key key, Record newRecord) {
     Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
-    if (_noMoreNewRecords) { // allow only existing record updates
+    if (_noMoreNewRecords) {
+      // allow only existing record updates
       _lookupMap.computeIfPresent(key, (k, v) -> {
         Object[] existingValues = v.getValues();
         Object[] newValues = newRecord.getValues();
@@ -65,8 +54,8 @@ public class SimpleIndexedTable extends IndexedTable {
         }
         return v;
       });
-    } else { // allow all records
-
+    } else {
+      // allow all records
       _lookupMap.compute(key, (k, v) -> {
         if (v == null) {
           return newRecord;
@@ -84,7 +73,7 @@ public class SimpleIndexedTable extends IndexedTable {
       if (_lookupMap.size() >= _trimThreshold) {
         if (_hasOrderBy) {
           // reached max capacity, resize
-          resize(_trimSize);
+          resize();
         } else {
           // reached max capacity and no order by. No more new records will be 
accepted
           _noMoreNewRecords = true;
@@ -93,61 +82,4 @@ public class SimpleIndexedTable extends IndexedTable {
     }
     return true;
   }
-
-  private void resize(int trimToSize) {
-    long startTime = System.currentTimeMillis();
-    _lookupMap = _tableResizer.resizeRecordsMap(_lookupMap, trimToSize);
-    long endTime = System.currentTimeMillis();
-    long timeElapsed = endTime - startTime;
-    _numResizes++;
-    _resizeTimeMs += timeElapsed;
-  }
-
-  private List<Record> resizeAndSort(int trimToSize) {
-    long startTime = System.currentTimeMillis();
-    List<Record> sortedRecords = _tableResizer.sortRecordsMap(_lookupMap, 
trimToSize);
-    long endTime = System.currentTimeMillis();
-    long timeElapsed = endTime - startTime;
-    _numResizes++;
-    _resizeTimeMs += timeElapsed;
-    return sortedRecords;
-  }
-
-  @Override
-  public int size() {
-    return _sortedRecords == null ? _lookupMap.size() : _sortedRecords.size();
-  }
-
-  @Override
-  public Iterator<Record> iterator() {
-    return _iterator;
-  }
-
-  @Override
-  public void finish(boolean sort) {
-    if (_hasOrderBy) {
-      if (sort) {
-        _sortedRecords = resizeAndSort(_trimSize);
-        _iterator = _sortedRecords.iterator();
-      } else {
-        resize(_trimSize);
-      }
-      LOGGER.debug(
-          "Num resizes : {}, Total time spent in resizing : {}, Avg resize 
time : {}, trimSize: {}, trimThreshold: {}",
-          _numResizes, _resizeTimeMs, _numResizes == 0 ? 0 : _resizeTimeMs / 
_numResizes, _trimSize, _trimThreshold);
-    }
-    if (_iterator == null) {
-      _iterator = _lookupMap.values().iterator();
-    }
-  }
-
-  @Override
-  public int getNumResizes() {
-    return _numResizes;
-  }
-
-  @Override
-  public long getResizeTimeMs() {
-    return _resizeTimeMs;
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index 3e98f6a..d1cd71d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.core.data.table;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -27,8 +29,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
@@ -109,8 +109,8 @@ public class TableResizer {
       return new GroupByExpressionExtractor(groupByExpressionIndex);
     }
     FunctionContext function = expression.getFunction();
-    Preconditions
-        .checkState(function != null, "Failed to find ORDER-BY expression: %s 
in the GROUP-BY clause", expression);
+    Preconditions.checkState(function != null, "Failed to find ORDER-BY 
expression: %s in the GROUP-BY clause",
+        expression);
     if (function.getType() == FunctionContext.Type.AGGREGATION) {
       // Aggregation function
       return new 
AggregationFunctionExtractor(_aggregationFunctionIndexMap.get(function));
@@ -121,62 +121,40 @@ public class TableResizer {
   }
 
   /**
-   * Constructs an IntermediateRecord from Record
-   * The IntermediateRecord::key is the same Record::key
-   * The IntermediateRecord::values contains only the order by columns, in the 
query's sort sequence
-   * For aggregation values in the order by, the final result is extracted if 
the intermediate result is non-comparable
+   * Constructs an IntermediateRecord by extracting the order-by values from 
the record.
    */
   private IntermediateRecord getIntermediateRecord(Key key, Record record) {
-    Comparable[] intermediateRecordValues = new 
Comparable[_numOrderByExpressions];
+    Comparable[] orderByValues = new Comparable[_numOrderByExpressions];
     for (int i = 0; i < _numOrderByExpressions; i++) {
-      intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
+      orderByValues[i] = _orderByValueExtractors[i].extract(record);
     }
-    return new IntermediateRecord(key, intermediateRecordValues, record);
+    return new IntermediateRecord(key, record, orderByValues);
   }
 
   /**
-   * Trim recordsMap to trimToSize, based on order by information
-   * Resize only if number of records is greater than trimToSize
-   * The resizer smartly chooses to create PQ of records to evict or records 
to retain, based on the number of
-   * records and the number of records to evict
+   * Resizes the recordsMap to the given size.
    */
-  public Map<Key, Record> resizeRecordsMap(Map<Key, Record> recordsMap, int 
trimToSize) {
-    int numRecordsToEvict = recordsMap.size() - trimToSize;
-    if (numRecordsToEvict > 0) {
-      // TODO: compare the performance of converting to IntermediateRecord vs 
keeping Record, in cases where we do
-      //  not need to extract final results
-      if (numRecordsToEvict < trimToSize) {
-        // num records to evict is smaller than num records to retain
-        // make PQ of records to evict
-        PriorityQueue<IntermediateRecord> priorityQueue =
-            convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, 
_intermediateRecordComparator);
-        for (IntermediateRecord evictRecord : priorityQueue) {
-          recordsMap.remove(evictRecord._key);
-        }
-        return recordsMap;
-      } else {
-        // num records to retain is smaller than num records to evict
-        // make PQ of records to retain
-        // TODO - Consider reusing the same map by removing record from the map
-        // at the time it is evicted from PQ
-        Map<Key, Record> trimmedRecordsMap;
-        if (recordsMap instanceof ConcurrentMap) {
-          // invoked by ConcurrentIndexedTable
-          trimmedRecordsMap = new ConcurrentHashMap<>();
-        } else {
-          // invoked by SimpleIndexedTable
-          trimmedRecordsMap = new HashMap<>();
-        }
-        Comparator<IntermediateRecord> comparator = 
_intermediateRecordComparator.reversed();
-        PriorityQueue<IntermediateRecord> priorityQueue =
-            convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
-        for (IntermediateRecord recordToRetain : priorityQueue) {
-          trimmedRecordsMap.put(recordToRetain._key, recordToRetain._record);
-        }
-        return trimmedRecordsMap;
+  public void resizeRecordsMap(Map<Key, Record> recordsMap, int size) {
+    int numRecordsToEvict = recordsMap.size() - size;
+    if (numRecordsToEvict <= 0) {
+      return;
+    }
+    if (numRecordsToEvict <= size) {
+      // Fewer records to evict than retain, make PQ of records to evict
+      PriorityQueue<IntermediateRecord> priorityQueue =
+          convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, 
_intermediateRecordComparator);
+      for (IntermediateRecord recordToEvict : priorityQueue) {
+        recordsMap.remove(recordToEvict._key);
+      }
+    } else {
+      // Fewer records to retain than evict, make PQ of records to retain
+      PriorityQueue<IntermediateRecord> priorityQueue =
+          convertToIntermediateRecordsPQ(recordsMap, size, 
_intermediateRecordComparator.reversed());
+      recordsMap.clear();
+      for (IntermediateRecord recordToRetain : priorityQueue) {
+        recordsMap.put(recordToRetain._key, recordToRetain._record);
       }
     }
-    return recordsMap;
   }
 
   private PriorityQueue<IntermediateRecord> 
convertToIntermediateRecordsPQ(Map<Key, Record> recordsMap, int size,
@@ -198,25 +176,44 @@ public class TableResizer {
   }
 
   /**
-   * Sorts the recordsMap using a priority queue and returns a sorted list of 
records
-   * This method is to be called from IndexedTable::finish, if both resize and 
sort is needed
+   * Returns the top records from the recordsMap.
    */
-  public List<Record> sortRecordsMap(Map<Key, Record> recordsMap, int 
trimToSize) {
+  public Collection<Record> getTopRecords(Map<Key, Record> recordsMap, int 
size, boolean sort) {
+    return sort ? getSortedTopRecords(recordsMap, size) : 
getUnsortedTopRecords(recordsMap, size);
+  }
+
+  @VisibleForTesting
+  List<Record> getSortedTopRecords(Map<Key, Record> recordsMap, int size) {
     int numRecords = recordsMap.size();
     if (numRecords == 0) {
       return Collections.emptyList();
     }
-    int numRecordsToRetain = Math.min(numRecords, trimToSize);
-    // make PQ of sorted records to retain
-    PriorityQueue<IntermediateRecord> priorityQueue =
-        convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
-    Record[] sortedArray = new Record[numRecordsToRetain];
-    while (!priorityQueue.isEmpty()) {
-      IntermediateRecord intermediateRecord = priorityQueue.poll();
-      sortedArray[--numRecordsToRetain] = intermediateRecord._record;
-      ;
+    size = Math.min(numRecords, size);
+    PriorityQueue<IntermediateRecord> topRecords =
+        convertToIntermediateRecordsPQ(recordsMap, size, 
_intermediateRecordComparator.reversed());
+    Record[] sortedTopRecords = new Record[size];
+    while (size > 0) {
+      IntermediateRecord intermediateRecord = topRecords.poll();
+      assert intermediateRecord != null;
+      sortedTopRecords[--size] = intermediateRecord._record;
+    }
+    return Arrays.asList(sortedTopRecords);
+  }
+
+  private Collection<Record> getUnsortedTopRecords(Map<Key, Record> 
recordsMap, int size) {
+    int numRecords = recordsMap.size();
+    if (numRecords <= size) {
+      return recordsMap.values();
+    } else {
+      PriorityQueue<IntermediateRecord> topRecords =
+          convertToIntermediateRecordsPQ(recordsMap, size, 
_intermediateRecordComparator.reversed());
+      Record[] unsortedTopRecords = new Record[size];
+      int index = 0;
+      for (IntermediateRecord topRecord : topRecords) {
+        unsortedTopRecords[index++] = topRecord._record;
+      }
+      return Arrays.asList(unsortedTopRecords);
     }
-    return Arrays.asList(sortedArray);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
index aa1bcb0..62accae 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.data.table;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
@@ -33,11 +35,13 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
  * noticed that load-unlock overhead was > 1sec and this specialized concurrent
  * indexed table avoids that by overriding just the upsert method
  */
-public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
+@SuppressWarnings("unchecked")
+public class UnboundedConcurrentIndexedTable extends IndexedTable {
+  private final AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
 
   public UnboundedConcurrentIndexedTable(DataSchema dataSchema, QueryContext 
queryContext, int trimSize,
       int trimThreshold) {
-    super(dataSchema, queryContext, trimSize, trimThreshold);
+    super(dataSchema, queryContext, trimSize, trimThreshold, new 
ConcurrentHashMap<>());
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 8826b58..579b8ce 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
@@ -73,7 +74,7 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
   // _futures (try to interrupt the execution if it already started).
   private final CountDownLatch _operatorLatch;
   private DataSchema _dataSchema;
-  private ConcurrentIndexedTable _indexedTable;
+  private IndexedTable _indexedTable;
 
   public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
       ExecutorService executorService, long endTimeMs, int minTrimSize, int 
trimThreshold) {
@@ -210,8 +211,8 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
     boolean opCompleted = _operatorLatch.await(timeoutMs, 
TimeUnit.MILLISECONDS);
     if (!opCompleted) {
       // If this happens, the broker side should already timed out, just log 
the error and return
-      String errorMessage = String
-          .format("Timed out while combining group-by order-by results after 
%dms, queryContext = %s", timeoutMs,
+      String errorMessage =
+          String.format("Timed out while combining group-by order-by results 
after %dms, queryContext = %s", timeoutMs,
               _queryContext);
       LOGGER.error(errorMessage);
       return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index 91fcb81..7bad89f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -77,8 +77,8 @@ public class TableResizerTest {
     _keys = Arrays.asList(new Key(new Object[]{"a", 10, 1.0}), new Key(new 
Object[]{"b", 10, 2.0}),
         new Key(new Object[]{"c", 200, 3.0}), new Key(new Object[]{"c", 50, 
4.0}),
         new Key(new Object[]{"c", 300, 5.0}));
-    List<Object[]> objectArray = Arrays
-        .asList(new Object[]{"a", 10, 1.0}, new Object[]{"b", 10, 2.0}, new 
Object[]{"c", 200, 3.0},
+    List<Object[]> objectArray =
+        Arrays.asList(new Object[]{"a", 10, 1.0}, new Object[]{"b", 10, 2.0}, 
new Object[]{"c", 200, 3.0},
             new Object[]{"c", 50, 4.0}, new Object[]{"c", 300, 5.0});
 
     // Use _keys for _groupKeys
@@ -212,7 +212,7 @@ public class TableResizerTest {
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
     recordsMap = new HashMap<>(_recordsMap);
-    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(0))); // a, b
     assertTrue(recordsMap.containsKey(_keys.get(1)));
@@ -221,7 +221,7 @@ public class TableResizerTest {
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
     recordsMap = new HashMap<>(_recordsMap);
-    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(4))); // 2, 3
     assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -230,7 +230,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + 
"DISTINCTCOUNT(m3) DESC, d1"));
     recordsMap = new HashMap<>(_recordsMap);
-    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(4))); // 4, 3
     assertTrue(recordsMap.containsKey(_keys.get(3)));
@@ -239,7 +239,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / 
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    recordsMap = tableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    tableResizer.resizeRecordsMap(recordsMap, trimToSize);
     assertEquals(recordsMap.size(), trimToSize);
     assertTrue(recordsMap.containsKey(_keys.get(1))); // 3.33, 12.5
     assertTrue(recordsMap.containsKey(_keys.get(0)));
@@ -249,26 +249,26 @@ public class TableResizerTest {
    * Tests the sort function for ordered resizer
    */
   @Test
-  public void testResizeAndSortRecordsMap() {
+  public void testSortTopRecords() {
     // d1 asc
     TableResizer tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1"));
     Map<Key, Record> recordsMap = new HashMap<>(_recordsMap);
-    List<Record> sortedRecords = tableResizer.sortRecordsMap(recordsMap, 
TRIM_TO_SIZE);
+    List<Record> sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 
TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a, b
     assertEquals(sortedRecords.get(1), _records.get(1));
 
     // d1 asc - trim to 1
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 1);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a
 
     // d1 asc, d3 desc (tie breaking with 2nd comparator)
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, d3 
DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a, b, c (300)
     assertEquals(sortedRecords.get(1), _records.get(1));
@@ -276,7 +276,7 @@ public class TableResizerTest {
 
     // d1 asc, d3 desc (tie breaking with 2nd comparator) - trim to 1
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 1);
     assertEquals(sortedRecords.size(), 1);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a
 
@@ -284,7 +284,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d1, 
SUM(m1) DESC, max(m2) DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a, b, c (30, 300)
     assertEquals(sortedRecords.get(1), _records.get(1));
@@ -292,7 +292,7 @@ public class TableResizerTest {
 
     // d1 asc, sum(m1) desc, max(m2) desc - trim to 1
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, 1);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, 1);
     assertEquals(sortedRecords.size(), 1);
     assertEquals(sortedRecords.get(0), _records.get(0));  // a
 
@@ -300,7 +300,7 @@ public class TableResizerTest {
     tableResizer =
         new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "AVG(m4)"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(4));  // 2, 3, 3.33
     assertEquals(sortedRecords.get(1), _records.get(3));
@@ -310,7 +310,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + 
"DISTINCTCOUNT(m3) DESC, d1"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(4));  // 4, 3, 2 (b)
     assertEquals(sortedRecords.get(1), _records.get(3));
@@ -320,7 +320,7 @@ public class TableResizerTest {
     tableResizer = new TableResizer(DATA_SCHEMA,
         QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d2 / 
(DISTINCTCOUNT(m3) + 1), d1 DESC"));
     recordsMap = new HashMap<>(_recordsMap);
-    sortedRecords = tableResizer.sortRecordsMap(recordsMap, TRIM_TO_SIZE);
+    sortedRecords = tableResizer.getSortedTopRecords(recordsMap, TRIM_TO_SIZE);
     assertEquals(sortedRecords.size(), TRIM_TO_SIZE);
     assertEquals(sortedRecords.get(0), _records.get(1));  // 3.33, 12.5, 5
     assertEquals(sortedRecords.get(1), _records.get(0));
@@ -347,8 +347,8 @@ public class TableResizerTest {
     assertEquals(resultArray[1]._record, _records.get(3));
     assertEquals(resultArray[2]._record, _records.get(4));
 
-    tableResizer = new TableResizer(DATA_SCHEMA, QueryContextConverterUtils
-        .getQueryContextFromSQL(QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC, 
DISTINCTCOUNT(m3) DESC"));
+    tableResizer = new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(
+        QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC, DISTINCTCOUNT(m3) DESC"));
     results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(), 
_groupByResultHolders, TRIM_TO_SIZE);
     assertEquals(results.size(), TRIM_TO_SIZE);
     for (int i = 0; i < TRIM_TO_SIZE; ++i) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to