gianm commented on code in PR #18005:
URL: https://github.com/apache/druid/pull/18005#discussion_r2103323794


##########
processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java:
##########
@@ -123,10 +122,9 @@ public boolean isProcessable()
   }
 
   @Override
-  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query)
+  public SegmentMapFunction createSegmentMapFunction(Query query)
   {
-    final Function<SegmentReference, SegmentReference> segmentMapFn = 
base.createSegmentMapFunction(query);
-    return baseSegment -> new 
RestrictedSegment(segmentMapFn.apply(baseSegment), policy);
+    return SegmentMapFunction.map(segment -> new RestrictedSegment(segment, 
policy));

Review Comment:
   I understand it's pointless to start from 
`base.createSegmentMapFunction(query)` because we "know" it will be identity, 
but IMO it's better form to do it anyway. That way all the datasources work the 
same way, and there's a degree of future-proofing for restrictions being able 
to apply to things other than tables.



##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Functional interface that captures the process of acquiring a {@link 
Segment} from a
+ * {@link ReferenceCountedObjectProvider<Segment>} and performing any 
transformations done on top of this leaf {@link Segment}
+ * before it is available to query processing engines.
+ * <p>
+ * The {@link Segment} returned by this method, if present, must always be 
closed to return the reference to the base
+ * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ */
+@FunctionalInterface
+public interface SegmentMapFunction extends 
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+{
+  /**
+   * {@link SegmentMapFunction} that acuires a {@link Segment} from the {@link 
ReferenceCountedObjectProvider<Segment>}

Review Comment:
   acquires (spelling)



##########
processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java:
##########
@@ -254,8 +254,11 @@ public void close()
   }
 
   @Override
-  public Optional<Closeable> acquireReferences()
+  public Optional<Closeable> acquireReference()
   {
-    return Optional.empty();
+    // this is wrong, we should have a reference to the reference counted 
segment instead of the direct segment so that

Review Comment:
   was it wrong prior to this patch? what issues will the wrongness cause?



##########
processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java:
##########
@@ -22,42 +22,26 @@
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.Segment;
 
 public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
 {
   private final QueryRunnerFactory<T, Query<T>> factory;
-  private final SegmentReference segment;
-  private final SegmentDescriptor descriptor;
+  private final Segment segment;
 
   public ReferenceCountingSegmentQueryRunner(
       QueryRunnerFactory<T, Query<T>> factory,
-      SegmentReference segment,
-      SegmentDescriptor descriptor
+      Segment segment
   )
   {
     this.factory = factory;
     this.segment = segment;
-    this.descriptor = descriptor;
   }
 
   @Override
   public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext 
responseContext)
   {
-    return segment.acquireReferences().map(closeable -> {
-      try {
-        final Sequence<T> baseSequence = 
factory.createRunner(segment).run(queryPlus, responseContext);
-        return Sequences.withBaggage(baseSequence, closeable);
-      }
-      catch (Throwable t) {
-        try {
-          closeable.close();
-        }
-        catch (Exception e) {
-          t.addSuppressed(e);
-        }
-        throw t;
-      }
-    }).orElseGet(() -> new 
ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus, 
responseContext));
+    final Sequence<T> baseSequence = 
factory.createRunner(segment).run(queryPlus, responseContext);
+    return Sequences.withBaggage(baseSequence, segment);

Review Comment:
   I think this leaks a reference if the call to 
`factory.createRunner(segment).run(queryPlus, responseContext)` throws an 
exception. The prior code handled it with the try/catch.



##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Functional interface that captures the process of acquiring a {@link 
Segment} from a
+ * {@link ReferenceCountedObjectProvider<Segment>} and performing any 
transformations done on top of this leaf {@link Segment}
+ * before it is available to query processing engines.
+ * <p>
+ * The {@link Segment} returned by this method, if present, must always be 
closed to return the reference to the base
+ * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ */
+@FunctionalInterface
+public interface SegmentMapFunction extends 
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+{
+  /**
+   * {@link SegmentMapFunction} that acuires a {@link Segment} from the {@link 
ReferenceCountedObjectProvider<Segment>}
+   */
+  SegmentMapFunction IDENTITY = 
ReferenceCountedObjectProvider::acquireReference;
+
+  /**
+   * Returns a {@link SegmentMapFunction} that calls {@link 
ReferenceCountedObjectProvider#acquireReference()} and then
+   * applies the function to the resulting {@link Segment}, if present
+   */
+  static SegmentMapFunction map(Function<Segment, Segment> mapFn)

Review Comment:
   I think we can remove this method. It's only used by the "restricted" stuff 
currently, but imo those should be adjusted to use `andThen` anyway.



##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Functional interface that captures the process of acquiring a {@link 
Segment} from a
+ * {@link ReferenceCountedObjectProvider<Segment>} and performing any 
transformations done on top of this leaf {@link Segment}
+ * before it is available to query processing engines.
+ * <p>
+ * The {@link Segment} returned by this method, if present, must always be 
closed to return the reference to the base
+ * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ */
+@FunctionalInterface
+public interface SegmentMapFunction extends 
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+{
+  /**
+   * {@link SegmentMapFunction} that acuires a {@link Segment} from the {@link 
ReferenceCountedObjectProvider<Segment>}
+   */
+  SegmentMapFunction IDENTITY = 
ReferenceCountedObjectProvider::acquireReference;
+
+  /**
+   * Returns a {@link SegmentMapFunction} that calls {@link 
ReferenceCountedObjectProvider#acquireReference()} and then
+   * applies the function to the resulting {@link Segment}, if present
+   */
+  static SegmentMapFunction map(Function<Segment, Segment> mapFn)
+  {
+    return segmentReference -> segmentReference.acquireReference().map(mapFn);
+  }
+
+  /**
+   * Returns a {@link SegmentMapFunction} which first applies the supplied 
{@link SegmentMapFunction} and then applies
+   * the function to the resulting {@link Segment}, if present
+   */
+  static SegmentMapFunction wrap(SegmentMapFunction segmentMapFn, 
Function<Segment, Segment> mapFn)

Review Comment:
   move into a non-static method `SegmentMapFunction andThen(mapFn)` so it's 
more fluent? Then this pattern:
   
   ```
   final SegmentMapFunction mapFn = base.createSegmentMapFunction(query);
   return SegmentMapFunction.wrap(mapFn, segment -> new 
FilteredSegment(segment, filter));
   ```
   
   Would instead be:
   
   ```
   return base.createSegmentMapFunction(query)
              .andThen(segment -> new FilteredSegment(segment, filter));
   ```



##########
processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java:
##########
@@ -393,7 +393,7 @@ private boolean isDirectAccess(final ColumnInspector 
inspector)
   {
     if (parsedExpression.get().isIdentifier()) {
       final ColumnCapabilities baseCapabilities =
-          
inspector.getColumnCapabilities(parsedExpression.get().getBindingIfIdentifier());
+          
inspector.getColumnCapabilities(parsedExpression.get().getIdentifierIfIdentifier());

Review Comment:
   was this related to this patch somehow?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -130,8 +130,10 @@ protected abstract ReturnOrAwait<Unit> runWithInputChannel(
    * Helper intended to be used by subclasses. Applies {@link #segmentMapFn}, 
which applies broadcast joins
    * if applicable to this query.
    */
-  protected SegmentReference mapSegment(final Segment segment)
+  protected Optional<Segment> mapSegment(final Segment segment)
   {
-    return 
segmentMapFn.apply(ReferenceCountingSegment.wrapRootGenerationSegment(segment));
+    // we use wrapUnmanaged here because segment reference tracking and 
lifecycle management happens elsewhere, so all
+    // we need to be able to do here is apply a segment map function since we 
don't care about the provider
+    return 
segmentMapFn.apply(ReferenceCountedSegmentProvider.wrapUnmanaged(segment));

Review Comment:
   Is this ok in situations where the `segmentMapFn` acquires its own 
references to things, such as joinables? (There might have been a problem with 
this even prior to this patch.)



##########
server/src/main/java/org/apache/druid/server/coordination/ServerManager.java:
##########
@@ -258,21 +259,23 @@ protected <T> QueryRunner<T> buildQueryRunnerForSegment(
       return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
     }
 
-    final ReferenceCountingSegment segment = chunk.getObject();
-    return buildAndDecorateQueryRunner(
+    final ReferenceCountedSegmentProvider referenceCounter = chunk.getObject();
+
+    final Optional<Segment> maybeSegment = 
segmentMapFn.apply(referenceCounter);

Review Comment:
   I think with this change, we're now acquiring references prior to creation 
of the `QueryRunner` for a segment; where previously they were acquired on the 
call to `QueryRunner#run`. It means we could leak references in two new cases:
   
   - if an exception occurs in `buildAndDecorateQueryRunner`
   - if the query runner is never actually run
   
   The first thing can be fixed by wrapping the call to 
`buildAndDecorateQueryRunner` in a `catch (Throwable e)` and closing the `seg` 
if it fails.
   
   The second is trickier. I think in some cases it does make sense for query 
runners to not ever be run. For example, an unordered `scan` query with `limit` 
could exit early before processing all segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to