This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 107255a Acquire and release index buffers in Segment level operator (#7295) 107255a is described below commit 107255a9e681c8c563be211e4c2b6c9787534723 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Mon Aug 16 21:00:14 2021 -0700 Acquire and release index buffers in Segment level operator (#7295) * Acquire and release index buffers * Review comments * Final review comments * spotless apply --- .../AcquireReleaseColumnsSegmentOperator.java | 68 ++++++++++++++++++++++ .../plan/AcquireReleaseColumnsSegmentPlanNode.java | 45 ++++++++++++++ .../core/plan/maker/InstancePlanMakerImplV2.java | 47 +++++++++++---- .../org/apache/pinot/core/util/QueryOptions.java | 4 ++ .../immutable/ImmutableSegmentImpl.java | 10 ++++ .../org/apache/pinot/segment/spi/IndexSegment.java | 20 ++++++- .../segment/spi/store/ColumnIndexDirectory.java | 14 ++++- .../pinot/segment/spi/store/SegmentDirectory.java | 19 +++++- .../apache/pinot/spi/utils/CommonConstants.java | 1 + 9 files changed, 214 insertions(+), 14 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java new file mode 100644 index 0000000..f67b42a --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator; + +import java.util.Set; +import org.apache.pinot.core.common.Block; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.segment.spi.IndexSegment; + + +/** + * A common wrapper around the segment-level operator. + * Provides an opportunity to acquire and release column buffers before reading data + */ +public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { + private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator"; + + private final Operator _childOperator; + private final IndexSegment _indexSegment; + private final Set<String> _columns; + + public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment, Set<String> columns) { + _childOperator = childOperator; + _indexSegment = indexSegment; + _columns = columns; + } + + /** + * Makes a call to acquire column buffers from {@link IndexSegment} before getting nextBlock from childOperator, + * and + * a call to release the column buffers from {@link IndexSegment} after. + */ + @Override + protected Block getNextBlock() { + _indexSegment.acquire(_columns); + try { + return _childOperator.nextBlock(); + } finally { + _indexSegment.release(_columns); + } + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + return _childOperator.getExecutionStatistics(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java new file mode 100644 index 0000000..9662549 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import java.util.Set; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.segment.spi.IndexSegment; + + +/** + * A common wrapper for the segment-level plan node. + */ +public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode { + + private final PlanNode _childPlanNode; + private final IndexSegment _indexSegment; + private final Set<String> _columns; + + public AcquireReleaseColumnsSegmentPlanNode(PlanNode childPlanNode, IndexSegment indexSegment, Set<String> columns) { + _childPlanNode = childPlanNode; + _indexSegment = indexSegment; + _columns = columns; + } + + @Override + public AcquireReleaseColumnsSegmentOperator run() { + return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _columns); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 748e9a2..52a9a24 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -24,10 +24,12 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.core.plan.AcquireReleaseColumnsSegmentPlanNode; import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode; import org.apache.pinot.core.plan.AggregationGroupByPlanNode; import org.apache.pinot.core.plan.AggregationPlanNode; @@ -80,6 +82,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker { // set as pinot.server.query.executor.groupby.trim.threshold public static final String GROUPBY_TRIM_THRESHOLD_KEY = "groupby.trim.threshold"; public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; + public static final String ENABLE_BUFFER_ACQUIRE_RELEASE = "enable.buffer.acquire.release"; + public static final boolean DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE = false; private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); private final int _maxInitialResultHolderCapacity; @@ -89,6 +93,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { private final int _minSegmentGroupTrimSize; private final int _minServerGroupTrimSize; private final int _groupByTrimThreshold; + private final boolean _enableBufferAcquireRelease; @VisibleForTesting public InstancePlanMakerImplV2() { @@ -97,6 +102,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE; _minServerGroupTrimSize = DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE; _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD; + _enableBufferAcquireRelease = DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE; } @VisibleForTesting @@ -107,6 +113,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { _minSegmentGroupTrimSize = minSegmentGroupTrimSize; _minServerGroupTrimSize = minServerGroupTrimSize; _groupByTrimThreshold = groupByTrimThreshold; + _enableBufferAcquireRelease = DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE; } /** @@ -132,18 +139,42 @@ public class InstancePlanMakerImplV2 implements PlanMaker { Preconditions .checkState(_groupByTrimThreshold > 0, "Invalid configurable: groupByTrimThreshold: %d must be positive", _groupByTrimThreshold); - LOGGER.info( - "Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}", - _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentGroupTrimSize, _minServerGroupTrimSize); + _enableBufferAcquireRelease = Boolean.parseBoolean(config.getProperty(ENABLE_BUFFER_ACQUIRE_RELEASE)); + LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, " + + "minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}, enableBufferAcquireRelease: {}", + _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentGroupTrimSize, _minServerGroupTrimSize, + _enableBufferAcquireRelease); } @Override public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, ExecutorService executorService, long endTimeMs) { List<PlanNode> planNodes = new ArrayList<>(indexSegments.size()); - for (IndexSegment indexSegment : indexSegments) { - planNodes.add(makeSegmentPlanNode(indexSegment, queryContext)); + + if (_enableBufferAcquireRelease) { + boolean prefetch = + queryContext.getQueryOptions() != null && QueryOptions.isPrefetchBuffers(queryContext.getQueryOptions()); + List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions(); + for (IndexSegment indexSegment : indexSegments) { + Set<String> columns; + if (selectExpressions.size() == 1 && "*".equals(selectExpressions.get(0).getIdentifier())) { + columns = indexSegment.getPhysicalColumnNames(); + } else { + columns = queryContext.getColumns(); + } + if (prefetch) { + indexSegment.prefetch(columns); + } + planNodes.add( + new AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, queryContext), indexSegment, + columns)); + } + } else { + for (IndexSegment indexSegment : indexSegments) { + planNodes.add(makeSegmentPlanNode(indexSegment, queryContext)); + } } + CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit, _minServerGroupTrimSize, _groupByTrimThreshold, null); @@ -152,12 +183,6 @@ public class InstancePlanMakerImplV2 implements PlanMaker { @Override public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext) { - List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions(); - if (selectExpressions.size() == 1 && "*".equals(selectExpressions.get(0).getIdentifier())) { - indexSegment.prefetch(indexSegment.getPhysicalColumnNames()); - } else { - indexSegment.prefetch(queryContext.getColumns()); - } if (QueryContextUtils.isAggregationQuery(queryContext)) { List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions(); if (groupByExpressions != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java index 9246dac..2804cab 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java @@ -98,4 +98,8 @@ public class QueryOptions { String minServerGroupTrimSizeString = queryOptions.get(Request.QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE); return minServerGroupTrimSizeString != null ? Integer.parseInt(minServerGroupTrimSizeString) : null; } + + public static boolean isPrefetchBuffers(Map<String, String> queryOptions) { + return Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.PREFETCH_BUFFERS)); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index 143c464..33fe26f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -133,6 +133,16 @@ public class ImmutableSegmentImpl implements ImmutableSegment { } @Override + public void acquire(Set<String> columns) { + _segmentDirectory.acquire(columns); + } + + @Override + public void release(Set<String> columns) { + _segmentDirectory.release(columns); + } + + @Override public void destroy() { String segmentName = getSegmentName(); LOGGER.info("Trying to destroy segment : {}", segmentName); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java index da2517d..5085c10 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java @@ -87,13 +87,31 @@ public interface IndexSegment { GenericRow getRecord(int docId, GenericRow reuse); /** - * This is a hint to the the implementation, to prefetch buffers for specified columns + * Hints the segment to begin prefetching buffers for specified columns. + * Typically, this should be an async call made in the planning phase, + * in preparation for reading the data in the execution phase * @param columns columns to prefetch */ default void prefetch(Set<String> columns) { } /** + * Instructs the segment to fetch buffers for specified columns. + * Typically, this should be a blocking call made before the data is read + * @param columns columns to acquire + */ + default void acquire(Set<String> columns) { + } + + /** + * Instructs the segment to release buffers for specified columns. + * Typically, this should be a call made after the data is read + * @param columns columns to release + */ + default void release(Set<String> columns) { + } + + /** * Destroys segment in memory and closes file handlers if in MMAP mode. */ void destroy(); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java index 460151a..ad5a760 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java @@ -93,8 +93,20 @@ public abstract class ColumnIndexDirectory implements Closeable { public abstract Set<String> getColumnsWithIndex(ColumnIndexType type); /** - * Fetch the buffer for this column + * Hint to prefetch the buffer for this column */ public void prefetchBuffer(String columns) { } + + /** + * Fetch the buffer for this column + */ + public void acquireBuffer(String column) { + } + + /** + * Release the buffer for this column + */ + public void releaseBuffer(String column) { + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java index 71825da..b8fcc92 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java @@ -115,13 +115,30 @@ public abstract class SegmentDirectory implements Closeable { public abstract Set<String> getColumnsWithIndex(ColumnIndexType type); /** - * This is a hint to the the implementation, to prefetch buffers for specified columns + * This is a hint to the segment directory, to begin prefetching buffers for specified columns. + * Typically, this should be an async call hooked up from the planning phase, + * in preparation for reading data in execution phase * @param columns columns to prefetch */ public void prefetch(Set<String> columns) { } /** + * This is an instruction to the segment directory, to fetch buffers for specified column. + * Typically this should be a blocking call made before the data is read + * @param columns columns to acquire + */ + public void acquire(Set<String> columns) { + } + + /** + * This is an instruction to the segment directory to release the fetched buffers for the specified column. + * @param columns columns to release + */ + public void release(Set<String> columns) { + } + + /** * Reader for columnar index buffers from segment directory */ public abstract class Reader implements Closeable { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index cc03c65..b784ec1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -229,6 +229,7 @@ public class CommonConstants { public static final String SKIP_UPSERT = "skipUpsert"; public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "minSegmentGroupTrimSize"; public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize"; + public static final String PREFETCH_BUFFERS = "prefetchBuffers"; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org