clintropolis commented on code in PR #19460:
URL: https://github.com/apache/druid/pull/19460#discussion_r3352551122


##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java:
##########
@@ -110,9 +145,354 @@ public List<AggregatorFactory> 
getAggregatorsForPreAggregated()
     };
   }
 
+  private CursorHolder makeClusteredCursorHolder(CursorBuildSpec spec, 
ClusteredValueGroupsBaseTableSchema clusterSummary)
+  {
+    final ClusterGroupQueryPlan plan = Projections.planClusterGroupQuery(
+        new ArrayList<>(index.getClusterGroupSchemas()),
+        spec
+    );
+
+    if (plan.survivingGroups().isEmpty()) {
+      return EmptyClusteredCursorHolder.INSTANCE;
+    }
+
+    if (plan.survivingGroups().size() == 1) {
+      return makeSingleGroupClusteredCursorHolder(spec, plan, 
plan.survivingGroups().get(0));
+    }
+    return makeMultiGroupClusteredCursorHolder(spec, plan);
+  }
+
+  /**
+   * Rebuild {@code spec} for the per-group cursor holder of {@code 
valueGroup}, swapping in the plan's per-group
+   * filter rewrite: clustering-column leaves become {@link 
org.apache.druid.segment.filter.TrueFilter} /
+   * {@link org.apache.druid.segment.filter.FalseFilter} per the group's 
constant clustering tuple and fold through
+   * AND / OR / NOT, so the per-group {@link QueryableIndex}'s filter 
machinery never tries to look up indexes for
+   * clustering columns it doesn't physically carry. Selector-side access to 
clustering columns (SELECT / GROUP BY)
+   * is still served by {@link ClusteringColumnSelectorFactory} below.
+   */
+  private static CursorBuildSpec rebuildSpecForGroup(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    if (spec.getFilter() == null) {
+      return spec;
+    }
+    final Filter rewritten = plan.rewriteFor(valueGroup);
+    if (rewritten == spec.getFilter()) {
+      return spec;
+    }
+    return CursorBuildSpec.builder(spec).setFilter(rewritten).build();
+  }
+
+  private CursorHolder makeSingleGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan,
+      TableClusterGroupSpec valueGroup
+  )
+  {
+    final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+    if (groupIndex == null) {
+      throw DruidException.defensive(
+          "No cluster-group sub-index resolvable for clustering values "
+          + Arrays.toString(valueGroup.lookupClusteringValues())
+      );
+    }
+
+    return new QueryableIndexCursorHolder(
+        groupIndex,
+        rebuildSpecForGroup(spec, plan, valueGroup),
+        QueryableIndexTimeBoundaryInspector.create(groupIndex)
+    )
+    {
+      @Override
+      protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          Offset baseOffset
+      )
+      {
+        return new ClusteringColumnSelectorFactory(
+            super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+
+      @Override
+      protected VectorColumnSelectorFactory 
makeVectorColumnSelectorFactoryForOffset(
+          ColumnCache columnCache,
+          VectorOffset baseOffset
+      )
+      {
+        return new ClusteringVectorColumnSelectorFactory(
+            super.makeVectorColumnSelectorFactoryForOffset(columnCache, 
baseOffset),
+            valueGroup.getSummary().getClusteringColumns(),
+            valueGroup.lookupClusteringValues()
+        );
+      }
+    };
+  }
+
+  /**
+   * Build a cursor holder that walks multiple matching cluster groups 
back-to-back via
+   * {@link ConcatenatingCursor}. Each per-group {@link CursorHolder} is built 
lazily inside the cursor's group
+   * transition, so a query that finishes early (e.g., LIMIT-bounded) doesn't 
open every group's offset.
+   */
+  private CursorHolder makeMultiGroupClusteredCursorHolder(
+      CursorBuildSpec spec,
+      ClusterGroupQueryPlan plan
+  )
+  {
+    final List<TableClusterGroupSpec> matching = plan.survivingGroups();
+    // All matching specs share the same parent summary (they came out of one 
segment); grab a reference for
+    // getOrdering() and clusteringColumns below.
+    final ClusteredValueGroupsBaseTableSchema clusterSummary = 
matching.get(0).getSummary();
+    final RowSignature clusteringColumns = 
clusterSummary.getClusteringColumns();
+    final List<Object[]> clusteringValuesByGroup = new 
ArrayList<>(matching.size());
+    final List<Supplier<CursorHolder>> holderSuppliers = new 
ArrayList<>(matching.size());
+    // lifecycle management closer for per-group CursorHolders
+    final Closer closer = Closer.create();
+    for (TableClusterGroupSpec valueGroup : matching) {
+      clusteringValuesByGroup.add(valueGroup.lookupClusteringValues());
+      final QueryableIndex groupIndex = 
index.getClusterGroupQueryableIndex(valueGroup);
+      if (groupIndex == null) {
+        throw DruidException.defensive(
+            "No cluster-group sub-index resolvable for clustering values "
+            + Arrays.toString(valueGroup.lookupClusteringValues())
+        );
+      }
+      final CursorBuildSpec groupSpec = rebuildSpecForGroup(spec, plan, 
valueGroup);
+      holderSuppliers.add(
+          Suppliers.memoize(
+              () -> closer.register(
+                  new QueryableIndexCursorHolder(
+                      groupIndex,
+                      groupSpec,
+                      QueryableIndexTimeBoundaryInspector.create(groupIndex)
+                  )
+              )
+          )
+      );
+    }
+
+    // Initial wrapper state uses the first group's clustering values + a 
throwing placeholder delegate. The
+    // ConcatenatingCursor immediately calls setDelegate on init (before any 
selector is exposed). The vector
+    // wrapper carries the query-level max vector size from the build spec, 
the placeholder delegate can't be
+    // queried for sizing, and the value is constant across groups anyway.
+    final int vectorSize = spec.getQueryContext().getVectorSize();
+    final ClusteringColumnSelectorFactory wrapperFactory = new 
ClusteringColumnSelectorFactory(
+        UNINITIALIZED_DELEGATE,
+        clusteringColumns,
+        clusteringValuesByGroup.get(0)
+    );

Review Comment:
   as mentioned in other comment this is because the delegate also does 
metadata stuff, so kind of always needs to work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to