mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r332618635
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java ########## @@ -19,37 +19,68 @@ package org.apache.pinot.core.data.table; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nonnull; +import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.request.AggregationInfo; import org.apache.pinot.common.request.SelectionSort; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.order.OrderByUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Thread safe {@link Table} implementation for aggregating TableRecords based on combination of keys */ public class ConcurrentIndexedTable extends IndexedTable { + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentIndexedTable.class); + private ConcurrentMap<Key, Record> _lookupMap; - private Comparator<Record> _minHeapComparator; private ReentrantReadWriteLock _readWriteLock; + private boolean _isOrderBy; + private Comparator<Record> _orderByComparator; + private Comparator<Record> _finalOrderByComparator; + private List<Integer> _aggregationIndexes; + + private AtomicBoolean _noMoreNewRecords = new AtomicBoolean(); + private LongAdder _numResizes = new LongAdder(); + private LongAccumulator _resizeTime = new LongAccumulator(Long::sum, 0); + + /** + * Initializes the data structures and comparators needed for this Table + * @param dataSchema data schema of the record's keys and values + * @param aggregationInfos aggregation infors for the aggregations in record'd values + * @param orderBy list of {@link SelectionSort} defining the order by + * @param maxCapacity the max number of records to hold + * @param sort does final result need to be sorted + */ @Override public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, - int maxCapacity) { - super.init(dataSchema, aggregationInfos, orderBy, maxCapacity); + int maxCapacity, boolean sort) { + super.init(dataSchema, aggregationInfos, orderBy, maxCapacity, sort); - _minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos).reversed(); _lookupMap = new ConcurrentHashMap<>(); _readWriteLock = new ReentrantReadWriteLock(); + _isOrderBy = CollectionUtils.isNotEmpty(orderBy); + if (_isOrderBy) { + _orderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, false); Review comment: May be add a TODO here in the code as well? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org