Jackie-Jiang commented on code in PR #10660:
URL: https://github.com/apache/pinot/pull/10660#discussion_r1178430942


##########
pinot-core/src/main/java/org/apache/pinot/core/util/TaskUtils.java:
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * The term `task` and `thread` are used interchangeably in the logic to 
parallelize CombinePlanNode and
+ * BaseCombineOperator. This class provides common methods used to set up the 
parallel processing.
+ */
+public class TaskUtils {

Review Comment:
   This class name can cause confusion because it might be mixed with minion 
task. How about calling it `QueryMultiThreadingUtils` or something more specific



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java:
##########
@@ -105,78 +100,31 @@ private BaseCombineOperator getCombineOperator() {
       // Large number of plan nodes, run them in parallel
       // NOTE: Even if we get single executor thread, still run it using a 
separate thread so that the timeout can be
       //       honored
-
-      int maxExecutionThreads = _queryContext.getMaxExecutionThreads();
-      if (maxExecutionThreads <= 0) {
-        maxExecutionThreads = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY;
-      }
       int numTasks =
-          Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / 
TARGET_NUM_PLANS_PER_THREAD, maxExecutionThreads);
+          TaskUtils.getNumTasks(numPlanNodes, TARGET_NUM_PLANS_PER_THREAD, 
_queryContext.getMaxExecutionThreads());
       recording.setNumTasks(numTasks);
-
-      // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
-      // returns. We need to ensure no execution left before the main thread 
returning because the main thread holds the
-      // reference to the segments, and if the segments are deleted/refreshed, 
the segments can be released after the
-      // main thread returns, which would lead to undefined behavior (even JVM 
crash) when executing queries against
-      // them.
-      Phaser phaser = new Phaser(1);
-
-      // Submit all jobs
-      Future[] futures = new Future[numTasks];
-      for (int i = 0; i < numTasks; i++) {
-        int index = i;
-        futures[i] = _executorService.submit(new 
TraceCallable<List<Operator>>() {
-          @Override
-          public List<Operator> callJob() {
-            try {
-              // Register the thread to the phaser.
-              // If the phaser is terminated (returning negative value) when 
trying to register the thread, that means
-              // the query execution has timed out, and the main thread has 
deregistered itself and returned the result.
-              // Directly return as no execution result will be taken.
-              if (phaser.register() < 0) {
-                return Collections.emptyList();
-              }
-
-              List<Operator> operators = new ArrayList<>();
-              for (int i = index; i < numPlanNodes; i += numTasks) {
-                operators.add(_planNodes.get(i).run());
-              }
-              return operators;
-            } finally {
-              phaser.arriveAndDeregister();
-            }
-          }
-        });
-      }
-
-      // Get all results
-      try {
-        for (Future future : futures) {
-          List<Operator> ops = (List<Operator>) 
future.get(_queryContext.getEndTimeMs() - System.currentTimeMillis(),
-              TimeUnit.MILLISECONDS);
-          operators.addAll(ops);
+      TaskUtils.runTasksWithDeadline(numTasks, index -> {
+        List<Operator> ops = new ArrayList<>();
+        for (int i = index; i < numPlanNodes; i += numTasks) {
+          ops.add(_planNodes.get(i).run());
+        }
+        return ops;
+      }, taskRes -> {
+        if (taskRes != null && !taskRes.isEmpty()) {

Review Comment:
   (minor) This check seems redundant



##########
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java:
##########
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.bloom.GuavaBloomFilterReaderUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+
+
+/**
+ * The {@code ValueBasedSegmentPruner} prunes segments based on values inside 
the filter and segment metadata and data.
+ */
+@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
+abstract public class ValueBasedSegmentPruner implements SegmentPruner {
+  public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold";
+  protected int _inPredicateThreshold;
+
+  @Override
+  public void init(PinotConfiguration config) {
+    _inPredicateThreshold =
+        config.getProperty(IN_PREDICATE_THRESHOLD, 
Server.DEFAULT_VALUE_PRUNER_IN_PREDICATE_THRESHOLD);
+  }
+
+  @Override
+  public boolean isApplicableTo(QueryContext query) {

Review Comment:
   Suggest removing this so that the actual implementation must override it.
   - For `ColumnValueSegmentPruner`, we can prune segments when there are EQ, 
IN (with less than threshold values), RANGE, and there is no NOT
   - For `BloomFilterSegmentPruner`, we can prune segments when there are EQ, 
IN (with less than threshold values) and there is no NOT
   - For both the pruners, if there is OR and one of the child cannot be 
pruned, the whole query is not prunable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to