gianm commented on code in PR #18005:
URL: https://github.com/apache/druid/pull/18005#discussion_r2103323794
##########
processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java:
##########
@@ -123,10 +122,9 @@ public boolean isProcessable()
}
@Override
- public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
+ public SegmentMapFunction createSegmentMapFunction(Query query)
{
- final Function<SegmentReference, SegmentReference> segmentMapFn =
base.createSegmentMapFunction(query);
- return baseSegment -> new
RestrictedSegment(segmentMapFn.apply(baseSegment), policy);
+ return SegmentMapFunction.map(segment -> new RestrictedSegment(segment,
policy));
Review Comment:
I understand it's pointless to start from
`base.createSegmentMapFunction(query)` because we "know" it will be identity,
but IMO it's better form to do it anyway. That way all the datasources work the
same way, and there's a degree of future-proofing for restrictions being able
to apply to things other than tables.
##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Functional interface that captures the process of acquiring a {@link
Segment} from a
+ * {@link ReferenceCountedObjectProvider<Segment>} and performing any
transformations done on top of this leaf {@link Segment}
+ * before it is available to query processing engines.
+ * <p>
+ * The {@link Segment} returned by this method, if present, must always be
closed to return the reference to the base
+ * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ */
+@FunctionalInterface
+public interface SegmentMapFunction extends
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+{
+ /**
+ * {@link SegmentMapFunction} that acuires a {@link Segment} from the {@link
ReferenceCountedObjectProvider<Segment>}
Review Comment:
acquires (spelling)
##########
processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java:
##########
@@ -254,8 +254,11 @@ public void close()
}
@Override
- public Optional<Closeable> acquireReferences()
+ public Optional<Closeable> acquireReference()
{
- return Optional.empty();
+ // this is wrong, we should have a reference to the reference counted
segment instead of the direct segment so that
Review Comment:
was it wrong prior to this patch? what issues will the wrongness cause?
##########
processing/src/main/java/org/apache/druid/query/ReferenceCountingSegmentQueryRunner.java:
##########
@@ -22,42 +22,26 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.Segment;
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunnerFactory<T, Query<T>> factory;
- private final SegmentReference segment;
- private final SegmentDescriptor descriptor;
+ private final Segment segment;
public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
- SegmentReference segment,
- SegmentDescriptor descriptor
+ Segment segment
)
{
this.factory = factory;
this.segment = segment;
- this.descriptor = descriptor;
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext
responseContext)
{
- return segment.acquireReferences().map(closeable -> {
- try {
- final Sequence<T> baseSequence =
factory.createRunner(segment).run(queryPlus, responseContext);
- return Sequences.withBaggage(baseSequence, closeable);
- }
- catch (Throwable t) {
- try {
- closeable.close();
- }
- catch (Exception e) {
- t.addSuppressed(e);
- }
- throw t;
- }
- }).orElseGet(() -> new
ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus,
responseContext));
+ final Sequence<T> baseSequence =
factory.createRunner(segment).run(queryPlus, responseContext);
+ return Sequences.withBaggage(baseSequence, segment);
Review Comment:
I think this leaks a reference if the call to
`factory.createRunner(segment).run(queryPlus, responseContext)` throws an
exception. The prior code handled it with the try/catch.
##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Functional interface that captures the process of acquiring a {@link
Segment} from a
+ * {@link ReferenceCountedObjectProvider<Segment>} and performing any
transformations done on top of this leaf {@link Segment}
+ * before it is available to query processing engines.
+ * <p>
+ * The {@link Segment} returned by this method, if present, must always be
closed to return the reference to the base
+ * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ */
+@FunctionalInterface
+public interface SegmentMapFunction extends
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+{
+ /**
+ * {@link SegmentMapFunction} that acuires a {@link Segment} from the {@link
ReferenceCountedObjectProvider<Segment>}
+ */
+ SegmentMapFunction IDENTITY =
ReferenceCountedObjectProvider::acquireReference;
+
+ /**
+ * Returns a {@link SegmentMapFunction} that calls {@link
ReferenceCountedObjectProvider#acquireReference()} and then
+ * applies the function to the resulting {@link Segment}, if present
+ */
+ static SegmentMapFunction map(Function<Segment, Segment> mapFn)
Review Comment:
I think we can remove this method. It's only used by the "restricted" stuff
currently, but imo those should be adjusted to use `andThen` anyway.
##########
processing/src/main/java/org/apache/druid/segment/SegmentMapFunction.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Functional interface that captures the process of acquiring a {@link
Segment} from a
+ * {@link ReferenceCountedObjectProvider<Segment>} and performing any
transformations done on top of this leaf {@link Segment}
+ * before it is available to query processing engines.
+ * <p>
+ * The {@link Segment} returned by this method, if present, must always be
closed to return the reference to the base
+ * {@link ReferenceCountedObjectProvider<Segment>} that it came from.
+ */
+@FunctionalInterface
+public interface SegmentMapFunction extends
Function<ReferenceCountedObjectProvider<Segment>, Optional<Segment>>
+{
+ /**
+ * {@link SegmentMapFunction} that acuires a {@link Segment} from the {@link
ReferenceCountedObjectProvider<Segment>}
+ */
+ SegmentMapFunction IDENTITY =
ReferenceCountedObjectProvider::acquireReference;
+
+ /**
+ * Returns a {@link SegmentMapFunction} that calls {@link
ReferenceCountedObjectProvider#acquireReference()} and then
+ * applies the function to the resulting {@link Segment}, if present
+ */
+ static SegmentMapFunction map(Function<Segment, Segment> mapFn)
+ {
+ return segmentReference -> segmentReference.acquireReference().map(mapFn);
+ }
+
+ /**
+ * Returns a {@link SegmentMapFunction} which first applies the supplied
{@link SegmentMapFunction} and then applies
+ * the function to the resulting {@link Segment}, if present
+ */
+ static SegmentMapFunction wrap(SegmentMapFunction segmentMapFn,
Function<Segment, Segment> mapFn)
Review Comment:
move into a non-static method `SegmentMapFunction andThen(mapFn)` so it's
more fluent? Then this pattern:
```
final SegmentMapFunction mapFn = base.createSegmentMapFunction(query);
return SegmentMapFunction.wrap(mapFn, segment -> new
FilteredSegment(segment, filter));
```
Would instead be:
```
return base.createSegmentMapFunction(query)
.andThen(segment -> new FilteredSegment(segment, filter));
```
##########
processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java:
##########
@@ -393,7 +393,7 @@ private boolean isDirectAccess(final ColumnInspector
inspector)
{
if (parsedExpression.get().isIdentifier()) {
final ColumnCapabilities baseCapabilities =
-
inspector.getColumnCapabilities(parsedExpression.get().getBindingIfIdentifier());
+
inspector.getColumnCapabilities(parsedExpression.get().getIdentifierIfIdentifier());
Review Comment:
was this related to this patch somehow?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -130,8 +130,10 @@ protected abstract ReturnOrAwait<Unit> runWithInputChannel(
* Helper intended to be used by subclasses. Applies {@link #segmentMapFn},
which applies broadcast joins
* if applicable to this query.
*/
- protected SegmentReference mapSegment(final Segment segment)
+ protected Optional<Segment> mapSegment(final Segment segment)
{
- return
segmentMapFn.apply(ReferenceCountingSegment.wrapRootGenerationSegment(segment));
+ // we use wrapUnmanaged here because segment reference tracking and
lifecycle management happens elsewhere, so all
+ // we need to be able to do here is apply a segment map function since we
don't care about the provider
+ return
segmentMapFn.apply(ReferenceCountedSegmentProvider.wrapUnmanaged(segment));
Review Comment:
Is this ok in situations where the `segmentMapFn` acquires its own
references to things, such as joinables? (There might have been a problem with
this even prior to this patch.)
##########
server/src/main/java/org/apache/druid/server/coordination/ServerManager.java:
##########
@@ -258,21 +259,23 @@ protected <T> QueryRunner<T> buildQueryRunnerForSegment(
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
- final ReferenceCountingSegment segment = chunk.getObject();
- return buildAndDecorateQueryRunner(
+ final ReferenceCountedSegmentProvider referenceCounter = chunk.getObject();
+
+ final Optional<Segment> maybeSegment =
segmentMapFn.apply(referenceCounter);
Review Comment:
I think with this change, we're now acquiring references prior to creation
of the `QueryRunner` for a segment; where previously they were acquired on the
call to `QueryRunner#run`. It means we could leak references in two new cases:
- if an exception occurs in `buildAndDecorateQueryRunner`
- if the query runner is never actually run
The first thing can be fixed by wrapping the call to
`buildAndDecorateQueryRunner` in a `catch (Throwable e)` and closing the `seg`
if it fails.
The second is trickier. I think in some cases it does make sense for query
runners to not ever be run. For example, an unordered `scan` query with `limit`
could exit early before processing all segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]