This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 107255a  Acquire and release index buffers in Segment level operator 
(#7295)
107255a is described below

commit 107255a9e681c8c563be211e4c2b6c9787534723
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Mon Aug 16 21:00:14 2021 -0700

    Acquire and release index buffers in Segment level operator (#7295)
    
    * Acquire and release index buffers
    
    * Review comments
    
    * Final review comments
    
    * spotless apply
---
 .../AcquireReleaseColumnsSegmentOperator.java      | 68 ++++++++++++++++++++++
 .../plan/AcquireReleaseColumnsSegmentPlanNode.java | 45 ++++++++++++++
 .../core/plan/maker/InstancePlanMakerImplV2.java   | 47 +++++++++++----
 .../org/apache/pinot/core/util/QueryOptions.java   |  4 ++
 .../immutable/ImmutableSegmentImpl.java            | 10 ++++
 .../org/apache/pinot/segment/spi/IndexSegment.java | 20 ++++++-
 .../segment/spi/store/ColumnIndexDirectory.java    | 14 ++++-
 .../pinot/segment/spi/store/SegmentDirectory.java  | 19 +++++-
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 9 files changed, 214 insertions(+), 14 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
new file mode 100644
index 0000000..f67b42a
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator;
+
+import java.util.Set;
+import org.apache.pinot.core.common.Block;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * A common wrapper around the segment-level operator.
+ * Provides an opportunity to acquire and release column buffers before 
reading data
+ */
+public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
+  private static final String OPERATOR_NAME = 
"AcquireReleaseColumnsSegmentOperator";
+
+  private final Operator _childOperator;
+  private final IndexSegment _indexSegment;
+  private final Set<String> _columns;
+
+  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, 
IndexSegment indexSegment, Set<String> columns) {
+    _childOperator = childOperator;
+    _indexSegment = indexSegment;
+    _columns = columns;
+  }
+
+  /**
+   * Makes a call to acquire column buffers from {@link IndexSegment} before 
getting nextBlock from childOperator,
+   * and
+   * a call to release the column buffers from {@link IndexSegment} after.
+   */
+  @Override
+  protected Block getNextBlock() {
+    _indexSegment.acquire(_columns);
+    try {
+      return _childOperator.nextBlock();
+    } finally {
+      _indexSegment.release(_columns);
+    }
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    return _childOperator.getExecutionStatistics();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
new file mode 100644
index 0000000..9662549
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import java.util.Set;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * A common wrapper for the segment-level plan node.
+ */
+public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
+
+  private final PlanNode _childPlanNode;
+  private final IndexSegment _indexSegment;
+  private final Set<String> _columns;
+
+  public AcquireReleaseColumnsSegmentPlanNode(PlanNode childPlanNode, 
IndexSegment indexSegment, Set<String> columns) {
+    _childPlanNode = childPlanNode;
+    _indexSegment = indexSegment;
+    _columns = columns;
+  }
+
+  @Override
+  public AcquireReleaseColumnsSegmentOperator run() {
+    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), 
_indexSegment, _columns);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 748e9a2..52a9a24 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -24,10 +24,12 @@ import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.core.plan.AcquireReleaseColumnsSegmentPlanNode;
 import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
 import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
 import org.apache.pinot.core.plan.AggregationPlanNode;
@@ -80,6 +82,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD_KEY = 
"groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+  public static final String ENABLE_BUFFER_ACQUIRE_RELEASE = 
"enable.buffer.acquire.release";
+  public static final boolean DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE = 
false;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
   private final int _maxInitialResultHolderCapacity;
@@ -89,6 +93,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   private final int _minSegmentGroupTrimSize;
   private final int _minServerGroupTrimSize;
   private final int _groupByTrimThreshold;
+  private final boolean _enableBufferAcquireRelease;
 
   @VisibleForTesting
   public InstancePlanMakerImplV2() {
@@ -97,6 +102,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
     _minServerGroupTrimSize = DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
     _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+    _enableBufferAcquireRelease = DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE;
   }
 
   @VisibleForTesting
@@ -107,6 +113,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
     _minServerGroupTrimSize = minServerGroupTrimSize;
     _groupByTrimThreshold = groupByTrimThreshold;
+    _enableBufferAcquireRelease = DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE;
   }
 
   /**
@@ -132,18 +139,42 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
     Preconditions
         .checkState(_groupByTrimThreshold > 0, "Invalid configurable: 
groupByTrimThreshold: %d must be positive",
             _groupByTrimThreshold);
-    LOGGER.info(
-        "Initializing plan maker with maxInitialResultHolderCapacity: {}, 
numGroupsLimit: {}, minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}",
-        _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentGroupTrimSize, _minServerGroupTrimSize);
+    _enableBufferAcquireRelease = 
Boolean.parseBoolean(config.getProperty(ENABLE_BUFFER_ACQUIRE_RELEASE));
+    LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: 
{}, numGroupsLimit: {}, "
+            + "minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}, 
enableBufferAcquireRelease: {}",
+        _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentGroupTrimSize, _minServerGroupTrimSize,
+        _enableBufferAcquireRelease);
   }
 
   @Override
   public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext 
queryContext,
       ExecutorService executorService, long endTimeMs) {
     List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
-    for (IndexSegment indexSegment : indexSegments) {
-      planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
+
+    if (_enableBufferAcquireRelease) {
+      boolean prefetch =
+          queryContext.getQueryOptions() != null && 
QueryOptions.isPrefetchBuffers(queryContext.getQueryOptions());
+      List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+      for (IndexSegment indexSegment : indexSegments) {
+        Set<String> columns;
+        if (selectExpressions.size() == 1 && 
"*".equals(selectExpressions.get(0).getIdentifier())) {
+          columns = indexSegment.getPhysicalColumnNames();
+        } else {
+          columns = queryContext.getColumns();
+        }
+        if (prefetch) {
+          indexSegment.prefetch(columns);
+        }
+        planNodes.add(
+            new 
AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, 
queryContext), indexSegment,
+                columns));
+      }
+    } else {
+      for (IndexSegment indexSegment : indexSegments) {
+        planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
+      }
     }
+
     CombinePlanNode combinePlanNode =
         new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit,
             _minServerGroupTrimSize, _groupByTrimThreshold, null);
@@ -152,12 +183,6 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
 
   @Override
   public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext 
queryContext) {
-    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
-    if (selectExpressions.size() == 1 && 
"*".equals(selectExpressions.get(0).getIdentifier())) {
-      indexSegment.prefetch(indexSegment.getPhysicalColumnNames());
-    } else {
-      indexSegment.prefetch(queryContext.getColumns());
-    }
     if (QueryContextUtils.isAggregationQuery(queryContext)) {
       List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
       if (groupByExpressions != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java
index 9246dac..2804cab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptions.java
@@ -98,4 +98,8 @@ public class QueryOptions {
     String minServerGroupTrimSizeString = 
queryOptions.get(Request.QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
     return minServerGroupTrimSizeString != null ? 
Integer.parseInt(minServerGroupTrimSizeString) : null;
   }
+
+  public static boolean isPrefetchBuffers(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.PREFETCH_BUFFERS));
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 143c464..33fe26f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -133,6 +133,16 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
   }
 
   @Override
+  public void acquire(Set<String> columns) {
+    _segmentDirectory.acquire(columns);
+  }
+
+  @Override
+  public void release(Set<String> columns) {
+    _segmentDirectory.release(columns);
+  }
+
+  @Override
   public void destroy() {
     String segmentName = getSegmentName();
     LOGGER.info("Trying to destroy segment : {}", segmentName);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index da2517d..5085c10 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -87,13 +87,31 @@ public interface IndexSegment {
   GenericRow getRecord(int docId, GenericRow reuse);
 
   /**
-   * This is a hint to the the implementation, to prefetch buffers for 
specified columns
+   * Hints the segment to begin prefetching buffers for specified columns.
+   * Typically, this should be an async call made in the planning phase,
+   * in preparation for reading the data in the execution phase
    * @param columns columns to prefetch
    */
   default void prefetch(Set<String> columns) {
   }
 
   /**
+   * Instructs the segment to fetch buffers for specified columns.
+   * Typically, this should be a blocking call made before the data is read
+   * @param columns columns to acquire
+   */
+  default void acquire(Set<String> columns) {
+  }
+
+  /**
+   * Instructs the segment to release buffers for specified columns.
+   * Typically, this should be a call made after the data is read
+   * @param columns columns to release
+   */
+  default void release(Set<String> columns) {
+  }
+
+  /**
    * Destroys segment in memory and closes file handlers if in MMAP mode.
    */
   void destroy();
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
index 460151a..ad5a760 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
@@ -93,8 +93,20 @@ public abstract class ColumnIndexDirectory implements 
Closeable {
   public abstract Set<String> getColumnsWithIndex(ColumnIndexType type);
 
   /**
-   * Fetch the buffer for this column
+   * Hint to prefetch the buffer for this column
    */
   public void prefetchBuffer(String columns) {
   }
+
+  /**
+   * Fetch the buffer for this column
+   */
+  public void acquireBuffer(String column) {
+  }
+
+  /**
+   * Release the buffer for this column
+   */
+  public void releaseBuffer(String column) {
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index 71825da..b8fcc92 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -115,13 +115,30 @@ public abstract class SegmentDirectory implements 
Closeable {
   public abstract Set<String> getColumnsWithIndex(ColumnIndexType type);
 
   /**
-   * This is a hint to the the implementation, to prefetch buffers for 
specified columns
+   * This is a hint to the segment directory, to begin prefetching buffers for 
specified columns.
+   * Typically, this should be an async call hooked up from the planning phase,
+   * in preparation for reading data in execution phase
    * @param columns columns to prefetch
    */
   public void prefetch(Set<String> columns) {
   }
 
   /**
+   * This is an instruction to the segment directory, to fetch buffers for 
specified column.
+   * Typically this should be a blocking call made before the data is read
+   * @param columns columns to acquire
+   */
+  public void acquire(Set<String> columns) {
+  }
+
+  /**
+   * This is an instruction to the segment directory to release the fetched 
buffers for the specified column.
+   * @param columns columns to release
+   */
+  public void release(Set<String> columns) {
+  }
+
+  /**
    * Reader for columnar index buffers from segment directory
    */
   public abstract class Reader implements Closeable {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index cc03c65..b784ec1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -229,6 +229,7 @@ public class CommonConstants {
         public static final String SKIP_UPSERT = "skipUpsert";
         public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = 
"minSegmentGroupTrimSize";
         public static final String MIN_SERVER_GROUP_TRIM_SIZE = 
"minServerGroupTrimSize";
+        public static final String PREFETCH_BUFFERS = "prefetchBuffers";
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to