This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bbc62dcea09 add query metrics for vsf mode (#18727)
bbc62dcea09 is described below
commit bbc62dcea09931e686a5017c1602871d6fa9ef86
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Nov 12 21:31:54 2025 -0800
add query metrics for vsf mode (#18727)
changes:
* added `canLoadSegments()` method to `SegmentManager` and
`SegmentCacheManager`
* added 'query/load/batch/time' metric for 'wall time' spent waiting on all
segments to load for a query
* added 'query/load/time/avg' metric for average per segment load time for
a query
* added 'query/load/time/max' metric for max per segment load time for a
query
* added 'query/load/wait/avg' metric for average per segment wait time to
begin loading for a query
* added 'query/load/wait/max' metric for max per segment wait time to begin
loading for a query
* added 'query/load/count' metric for number of segments loaded on demand
for a query
* added 'query/load/bytes/total' metric for amount of data loaded on demand
for a query
---
.../embedded/query/QueryVirtualStorageTest.java | 72 ++++++++++++
.../druid/query/DataSegmentAndDescriptor.java | 5 +
.../apache/druid/query/DefaultQueryMetrics.java | 58 +++++++++-
.../java/org/apache/druid/query/QueryMetrics.java | 46 ++++++++
.../query/search/DefaultSearchQueryMetrics.java | 42 +++++++
.../segment/loading/AcquireSegmentAction.java | 9 +-
.../segment/loading/AcquireSegmentResult.java | 94 +++++++++++++++
.../druid/segment/loading/SegmentCacheManager.java | 7 +-
.../segment/loading/SegmentLocalCacheManager.java | 33 ++++--
.../org/apache/druid/server/SegmentManager.java | 5 +
.../org/apache/druid/server/ServerManager.java | 127 +++++++++++++++++----
.../segment/loading/NoopSegmentCacheManager.java | 12 ++
.../SegmentLocalCacheManagerConcurrencyTest.java | 7 +-
.../loading/SegmentLocalCacheManagerTest.java | 31 +++--
.../apache/druid/server/SegmentManagerTest.java | 9 +-
.../druid/test/utils/TestSegmentCacheManager.java | 3 +-
16 files changed, 492 insertions(+), 68 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index 8ea58968f81..2c53b80ef4d 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -24,8 +24,11 @@ import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -43,10 +46,12 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.io.ByteStreams;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -140,6 +145,7 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
// "2015-09-12T08:00:00Z/2025-09-12T14:00:00Z"
// "2015-09-12T14:00:00Z/2025-09-12T19:00:00Z"
// "2015-09-12T19:00:00Z/2025-09-13T00:00:00Z"
+
final String[] queries = new String[]{
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12
00:00:00' and __time < TIMESTAMP '2015-09-12 08:00:00'",
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12
08:00:00' and __time < TIMESTAMP '2015-09-12 14:00:00'",
@@ -154,13 +160,79 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
};
Assertions.assertEquals(expectedResults[0],
Long.parseLong(cluster.runSql(queries[0], dataSource)));
+ assertMetrics(1, 8L);
Assertions.assertEquals(expectedResults[1],
Long.parseLong(cluster.runSql(queries[1], dataSource)));
+ assertMetrics(2, 6L);
Assertions.assertEquals(expectedResults[2],
Long.parseLong(cluster.runSql(queries[2], dataSource)));
+ assertMetrics(3, 5L);
Assertions.assertEquals(expectedResults[3],
Long.parseLong(cluster.runSql(queries[3], dataSource)));
+ assertMetrics(4, 5L);
for (int i = 0; i < 1000; i++) {
int nextQuery = ThreadLocalRandom.current().nextInt(queries.length);
Assertions.assertEquals(expectedResults[nextQuery],
Long.parseLong(cluster.runSql(queries[nextQuery], dataSource)));
+ assertMetrics(i + 5, null);
+ }
+ }
+
+ private void assertMetrics(int expectedEventCount, @Nullable Long
expectedLoadCount)
+ {
+ LatchableEmitter emitter = historical.latchableEmitter();
+ final int lastIndex = expectedEventCount - 1;
+
+ List<ServiceMetricEvent> countEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT);
+ Assertions.assertEquals(expectedEventCount, countEvents.size());
+ if (expectedLoadCount != null) {
+ Assertions.assertEquals(expectedLoadCount,
countEvents.get(lastIndex).getValue());
+ }
+ boolean hasLoads = countEvents.get(lastIndex).getValue().longValue() > 0;
+
+ List<ServiceMetricEvent> timeEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME);
+ Assertions.assertEquals(expectedEventCount, timeEvents.size());
+ if (hasLoads) {
+ Assertions.assertTrue(timeEvents.get(lastIndex).getValue().longValue() >
0);
+ } else {
+ Assertions.assertEquals(0,
timeEvents.get(lastIndex).getValue().longValue());
+ }
+
+ List<ServiceMetricEvent> timeMaxEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX);
+ Assertions.assertEquals(expectedEventCount, timeMaxEvents.size());
+ if (hasLoads) {
+
Assertions.assertTrue(timeMaxEvents.get(lastIndex).getValue().longValue() > 0);
+ } else {
+ Assertions.assertEquals(0,
timeMaxEvents.get(lastIndex).getValue().longValue());
+ }
+
+ List<ServiceMetricEvent> timeAvgEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG);
+ Assertions.assertEquals(expectedEventCount, timeAvgEvents.size());
+ if (hasLoads) {
+
Assertions.assertTrue(timeAvgEvents.get(lastIndex).getValue().longValue() > 0);
+ } else {
+ Assertions.assertEquals(0,
timeAvgEvents.get(lastIndex).getValue().longValue());
+ }
+
+ List<ServiceMetricEvent> waitMaxEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX);
+ Assertions.assertEquals(expectedEventCount, waitMaxEvents.size());
+ if (hasLoads) {
+
Assertions.assertTrue(waitMaxEvents.get(lastIndex).getValue().longValue() >= 0);
+ } else {
+ Assertions.assertEquals(0,
waitMaxEvents.get(lastIndex).getValue().longValue());
+ }
+
+ List<ServiceMetricEvent> waitAvgEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG);
+ Assertions.assertEquals(expectedEventCount, waitAvgEvents.size());
+ if (hasLoads) {
+
Assertions.assertTrue(waitAvgEvents.get(lastIndex).getValue().longValue() >= 0);
+ } else {
+ Assertions.assertEquals(0,
waitAvgEvents.get(lastIndex).getValue().longValue());
+ }
+
+ List<ServiceMetricEvent> loadSizeEvents =
emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES);
+ Assertions.assertEquals(expectedEventCount, loadSizeEvents.size());
+ if (hasLoads) {
+
Assertions.assertTrue(loadSizeEvents.get(lastIndex).getValue().longValue() > 0);
+ } else {
+ Assertions.assertEquals(0,
loadSizeEvents.get(lastIndex).getValue().longValue());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/DataSegmentAndDescriptor.java
b/processing/src/main/java/org/apache/druid/query/DataSegmentAndDescriptor.java
index 30e346d3301..a1061df6d72 100644
---
a/processing/src/main/java/org/apache/druid/query/DataSegmentAndDescriptor.java
+++
b/processing/src/main/java/org/apache/druid/query/DataSegmentAndDescriptor.java
@@ -25,6 +25,11 @@ import javax.annotation.Nullable;
public class DataSegmentAndDescriptor
{
+ public static DataSegmentAndDescriptor missing(SegmentDescriptor descriptor)
+ {
+ return new DataSegmentAndDescriptor(null, descriptor);
+ }
+
@Nullable
private final DataSegment dataSegment;
private final SegmentDescriptor descriptor;
diff --git
a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index 91cdf387815..f3a4539f780 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -42,10 +42,20 @@ import java.util.stream.Collectors;
*/
public class DefaultQueryMetrics<QueryType extends Query<?>> implements
QueryMetrics<QueryType>
{
+ public static final String QUERY_TIME = "query/time";
+ public static final String QUERY_BYTES = "query/bytes";
+ public static final String QUERY_CPU_TIME = "query/cpu/time";
public static final String QUERY_WAIT_TIME = "query/wait/time";
public static final String QUERY_SEGMENT_TIME = "query/segment/time";
public static final String QUERY_SEGMENT_AND_CACHE_TIME =
"query/segmentAndCache/time";
public static final String QUERY_RESULT_CACHE_HIT = "query/resultCache/hit";
+ public static final String QUERY_ON_DEMAND_LOAD_BATCH_TIME =
"query/load/batch/time";
+ public static final String QUERY_ON_DEMAND_LOAD_TIME_AVG =
"query/load/time/avg";
+ public static final String QUERY_ON_DEMAND_LOAD_TIME_MAX =
"query/load/time/max";
+ public static final String QUERY_ON_DEMAND_WAIT_TIME_AVG =
"query/load/wait/avg";
+ public static final String QUERY_ON_DEMAND_WAIT_TIME_MAX =
"query/load/wait/max";
+ public static final String QUERY_ON_DEMAND_LOAD_COUNT = "query/load/count";
+ public static final String QUERY_ON_DEMAND_LOAD_BYTES =
"query/load/bytes/total";
protected final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();
@@ -240,13 +250,13 @@ public class DefaultQueryMetrics<QueryType extends
Query<?>> implements QueryMet
@Override
public QueryMetrics<QueryType> reportQueryTime(long timeNs)
{
- return reportMillisTimeMetric("query/time", timeNs);
+ return reportMillisTimeMetric(QUERY_TIME, timeNs);
}
@Override
public QueryMetrics<QueryType> reportQueryBytes(long byteCount)
{
- return reportMetric("query/bytes", byteCount);
+ return reportMetric(QUERY_BYTES, byteCount);
}
@Override
@@ -267,6 +277,48 @@ public class DefaultQueryMetrics<QueryType extends
Query<?>> implements QueryMet
return reportMillisTimeMetric(QUERY_SEGMENT_AND_CACHE_TIME, timeNs);
}
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadTime(long timeNs)
+ {
+ return reportMillisTimeMetric(QUERY_ON_DEMAND_LOAD_BATCH_TIME, timeNs);
+ }
+
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeAvg(long timeNs)
+ {
+ return reportMillisTimeMetric(QUERY_ON_DEMAND_LOAD_TIME_AVG, timeNs);
+ }
+
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeMax(long
timeNs)
+ {
+ return reportMillisTimeMetric(QUERY_ON_DEMAND_WAIT_TIME_MAX, timeNs);
+ }
+
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeAvg(long
timeNs)
+ {
+ return reportMillisTimeMetric(QUERY_ON_DEMAND_WAIT_TIME_AVG, timeNs);
+ }
+
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeMax(long timeNs)
+ {
+ return reportMillisTimeMetric(QUERY_ON_DEMAND_LOAD_TIME_MAX, timeNs);
+ }
+
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadBytes(long byteCount)
+ {
+ return reportMetric(QUERY_ON_DEMAND_LOAD_BYTES, byteCount);
+ }
+
+ @Override
+ public QueryMetrics<QueryType> reportSegmentOnDemandLoadCount(long count)
+ {
+ return reportMetric(QUERY_ON_DEMAND_LOAD_COUNT, count);
+ }
+
@Override
public QueryMetrics<QueryType> reportResultCachePoll(boolean hit)
{
@@ -276,7 +328,7 @@ public class DefaultQueryMetrics<QueryType extends
Query<?>> implements QueryMet
@Override
public QueryMetrics<QueryType> reportCpuTime(long timeNs)
{
- return reportMetric("query/cpu/time",
TimeUnit.NANOSECONDS.toMicros(timeNs));
+ return reportMetric(QUERY_CPU_TIME, TimeUnit.NANOSECONDS.toMicros(timeNs));
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index 8f0dff4da98..ebe99476062 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -361,6 +361,52 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/
QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs);
+
+ /**
+ * Registers the apparent time spent loading segments on demand, before
queing up for processing. This measurement is
+ * wall-clock time to when the last segment is finished loading and ready
for processing.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadTime(long timeNs);
+ /**
+ * Registers the average time spent loading segments on demand across
threads.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeAvg(long timeNs);
+ /**
+ * Registers the maximum time spent loading segments on demand across all
load threads.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeMax(long timeNs);
+ /**
+ * Registers the average time spent waiting for a thread to start loading
segments on demand across threads.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadWaitTimeAvg(long timeNs);
+ /**
+ * Registers the maximum time spent waiting for a thread to start loading
segments on demand across all load threads.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadTimeMax(long timeNs);
+ /**
+ * Registers the total number of bytes added to the cache when loading
segments on demand, summing the sizes loaded by
+ * individual segement load threads.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadBytes(long byteCount);
+ /**
+ * Registers the total numer of segments loaded on demand.
+ * <p>
+ * Emitted once per query
+ */
+ QueryMetrics<QueryType> reportSegmentOnDemandLoadCount(long count);
+
/**
* Emits iff a given query polled the result-level cache and the success of
that operation.
*/
diff --git
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 0548cc0aa45..c498a67bf4d 100644
---
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -223,6 +223,48 @@ public class DefaultSearchQueryMetrics implements
SearchQueryMetrics
return delegateQueryMetrics.reportSegmentAndCacheTime(timeNs);
}
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadTime(long timeNs)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadTime(timeNs);
+ }
+
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadTimeAvg(long timeNs)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadTimeAvg(timeNs);
+ }
+
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadWaitTimeMax(long timeNs)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadWaitTimeMax(timeNs);
+ }
+
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadWaitTimeAvg(long timeNs)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadWaitTimeAvg(timeNs);
+ }
+
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadTimeMax(long timeNs)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadTimeMax(timeNs);
+ }
+
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadBytes(long byteCount)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadBytes(byteCount);
+ }
+
+ @Override
+ public QueryMetrics reportSegmentOnDemandLoadCount(long count)
+ {
+ return delegateQueryMetrics.reportSegmentOnDemandLoadCount(count);
+ }
+
@Override
public QueryMetrics reportResultCachePoll(boolean hit)
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java
b/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java
index 5f02c1437c3..4e489b23d59 100644
---
a/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java
+++
b/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentAction.java
@@ -28,7 +28,6 @@ import org.apache.druid.segment.Segment;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -53,16 +52,16 @@ public class AcquireSegmentAction implements Closeable
{
public static AcquireSegmentAction missingSegment()
{
- return new AcquireSegmentAction(() ->
Futures.immediateFuture(Optional::empty), null);
+ return new AcquireSegmentAction(() ->
Futures.immediateFuture(AcquireSegmentResult.empty()), null);
}
- private final
Supplier<ListenableFuture<ReferenceCountedObjectProvider<Segment>>>
segmentFutureSupplier;
+ private final Supplier<ListenableFuture<AcquireSegmentResult>>
segmentFutureSupplier;
@Nullable
private final Closeable loadCleanup;
private final AtomicBoolean closed = new AtomicBoolean(false);
public AcquireSegmentAction(
- Supplier<ListenableFuture<ReferenceCountedObjectProvider<Segment>>>
segmentFutureSupplier,
+ Supplier<ListenableFuture<AcquireSegmentResult>> segmentFutureSupplier,
@Nullable Closeable loadCleanup
)
{
@@ -76,7 +75,7 @@ public class AcquireSegmentAction implements Closeable
* either as an immediate future if the segment already exists in cache. The
'action' to fetch the segment and return
* the reference provider is not initiated until this method is called.
*/
- public ListenableFuture<ReferenceCountedObjectProvider<Segment>>
getSegmentFuture()
+ public ListenableFuture<AcquireSegmentResult> getSegmentFuture()
{
return segmentFutureSupplier.get();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentResult.java
b/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentResult.java
new file mode 100644
index 00000000000..36b0b178b47
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/loading/AcquireSegmentResult.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import org.apache.druid.segment.ReferenceCountedObjectProvider;
+import org.apache.druid.segment.Segment;
+
+import java.util.Optional;
+
+/**
+ * Wraps a {@link ReferenceCountedObjectProvider<Segment>} with additional
measurements about segment loading, if it
+ * was required
+ */
+public class AcquireSegmentResult
+{
+ private static final AcquireSegmentResult EMPTY = new
AcquireSegmentResult(Optional::empty, 0L, 0L, 0L);
+
+ public static AcquireSegmentResult empty()
+ {
+ return EMPTY;
+ }
+
+ public static AcquireSegmentResult
cached(ReferenceCountedObjectProvider<Segment> provider)
+ {
+ return new AcquireSegmentResult(provider, 0L, 0L, 0L);
+ }
+
+ private final ReferenceCountedObjectProvider<Segment> referenceProvider;
+ private final long loadSizeBytes;
+ private final long waitTimeNanos;
+ private final long loadTimeNanos;
+
+ public AcquireSegmentResult(
+ ReferenceCountedObjectProvider<Segment> referenceProvider,
+ long loadSizeBytes,
+ long waitTimeNanos,
+ long loadTimeNanos
+ )
+ {
+ this.referenceProvider = referenceProvider;
+ this.loadSizeBytes = loadSizeBytes;
+ this.waitTimeNanos = waitTimeNanos;
+ this.loadTimeNanos = loadTimeNanos;
+ }
+
+ /**
+ * Segment reference provider for loaded segment
+ */
+ public ReferenceCountedObjectProvider<Segment> getReferenceProvider()
+ {
+ return referenceProvider;
+ }
+
+ /**
+ * Amount of data loaded into the cache, or 0 if it was already available in
the cache
+ */
+ public long getLoadSizeBytes()
+ {
+ return loadSizeBytes;
+ }
+
+ /**
+ * Amount of time spent waiting before actually loading the segment (e.g. if
loads are done on a shared thread pool)
+ */
+ public long getWaitTimeNanos()
+ {
+ return waitTimeNanos;
+ }
+
+ /**
+ * Amount of time spent loading a segment, or 0 if the segment was already
available in the cache
+ */
+ public long getLoadTimeNanos()
+ {
+ return loadTimeNanos;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index 201e653ec28..f75b7ec6d8b 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -43,10 +43,9 @@ public interface SegmentCacheManager
*/
boolean canHandleSegments();
- default boolean canLoadSegmentOnDemand(DataSegment segment)
- {
- return false;
- }
+ boolean canLoadSegmentsOnDemand();
+
+ boolean canLoadSegmentOnDemand(DataSegment segment);
/**
* Return a list of cached segments from local disk, if any. This should be
called only when
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 8b29b5237a1..3cdf2f1bf64 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.ReferenceCountedObjectProvider;
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
@@ -184,6 +183,12 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
return isLocationsValid || isLocationsConfigValid;
}
+ @Override
+ public boolean canLoadSegmentsOnDemand()
+ {
+ return config.isVirtualStorage();
+ }
+
@Override
public boolean canLoadSegmentOnDemand(DataSegment dataSegment)
{
@@ -460,7 +465,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
if (hold != null) {
if (hold.getEntry().isMounted()) {
return new AcquireSegmentAction(
- () ->
Futures.immediateFuture(hold.getEntry().referenceProvider),
+ () ->
Futures.immediateFuture(AcquireSegmentResult.cached(hold.getEntry().referenceProvider)),
hold
);
} else {
@@ -673,18 +678,28 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
return infoDir;
}
- private Supplier<ListenableFuture<ReferenceCountedObjectProvider<Segment>>>
makeOnDemandLoadSupplier(
+ private Supplier<ListenableFuture<AcquireSegmentResult>>
makeOnDemandLoadSupplier(
final SegmentCacheEntry entry,
final StorageLocation location
)
{
return Suppliers.memoize(
- () -> virtualStorageLoadOnDemandExec.submit(
- () -> {
- entry.mount(location);
- return entry.referenceProvider;
- }
- )
+ () -> {
+ final long startTime = System.nanoTime();
+ return virtualStorageLoadOnDemandExec.submit(
+ () -> {
+ final long execStartTime = System.nanoTime();
+ final long waitTime = execStartTime - startTime;
+ entry.mount(location);
+ return new AcquireSegmentResult(
+ entry.referenceProvider,
+ entry.dataSegment.getSize(),
+ waitTime,
+ System.nanoTime() - startTime
+ );
+ }
+ );
+ }
);
}
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 9550bd2a821..cf8314088ec 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -426,6 +426,11 @@ public class SegmentManager
return cacheManager.canHandleSegments();
}
+ public boolean canLoadSegmentsOnDemand()
+ {
+ return cacheManager.canLoadSegmentsOnDemand();
+ }
+
public boolean canLoadSegmentOnDemand(DataSegment dataSegment)
{
return cacheManager.canLoadSegmentOnDemand(dataSegment);
diff --git a/server/src/main/java/org/apache/druid/server/ServerManager.java
b/server/src/main/java/org/apache/druid/server/ServerManager.java
index cea656bedae..b4a37dbeeed 100644
--- a/server/src/main/java/org/apache/druid/server/ServerManager.java
+++ b/server/src/main/java/org/apache/druid/server/ServerManager.java
@@ -66,12 +66,12 @@ import org.apache.druid.query.planning.ExecutionVertex;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
-import org.apache.druid.segment.ReferenceCountedObjectProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
import org.apache.druid.segment.loading.VirtualPlaceholderSegment;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.DataSegment;
@@ -82,6 +82,7 @@ import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -221,15 +222,19 @@ public class ServerManager implements QuerySegmentWalker
if (chunk != null) {
segmentsToMap.add(new DataSegmentAndDescriptor(chunk.getObject(),
descriptor));
} else {
- segmentsToMap.add(new DataSegmentAndDescriptor(null, descriptor));
+ segmentsToMap.add(DataSegmentAndDescriptor.missing(descriptor));
}
}
return segmentManager.getSegmentsBundle(segmentsToMap, segmentMapFunction);
}
+ /**
+ * Combines {@link LeafSegmentsBundle#cachedSegments} with {@link
LeafSegmentsBundle#loadableSegments}, loading the
+ * latter set into the cache
+ */
protected ArrayList<SegmentReference> getOrLoadBundleSegments(
- Query<?> query,
+ QueryPlus<?> queryPlus,
LeafSegmentsBundle segmentsBundle,
SegmentMapFunction segmentMapFunction
)
@@ -243,7 +248,7 @@ public class ServerManager implements QuerySegmentWalker
// so we use placeholders instead. this is kind of gross, but otherwise
we're going to be loading weak assignments
// more or less as soon as they are assigned instead of on demand at query
time due to the broker issuing metadata
// queries to build the SQL schema
- if (query instanceof SegmentMetadataQuery) {
+ if (queryPlus.getQuery() instanceof SegmentMetadataQuery) {
for (DataSegmentAndDescriptor segment :
segmentsBundle.getLoadableSegments()) {
segmentReferences.add(
new SegmentReference(
@@ -256,13 +261,15 @@ public class ServerManager implements QuerySegmentWalker
} else {
// load the remaining segments
try {
- segmentReferences.addAll(
- getSegmentReferences(
- segmentsBundle.getLoadableSegments(),
- segmentMapFunction,
- query.context().getTimeout()
- )
+ final LoadSegmentsResult result = getOrLoadSegmentReferences(
+ segmentsBundle.getLoadableSegments(),
+ segmentMapFunction,
+ queryPlus.getQuery().context().getTimeout()
);
+ segmentReferences.addAll(result.getSegmentReferences());
+ if (segmentManager.canLoadSegmentsOnDemand()) {
+ result.reportMetrics(queryPlus.getQueryMetrics());
+ }
}
catch (Throwable t) {
throw CloseableUtils.closeAndWrapInCatch(t,
segmentsBundle::closeCachedReferences);
@@ -274,18 +281,19 @@ public class ServerManager implements QuerySegmentWalker
/**
* Given a list of {@link DataSegmentAndDescriptor}, uses {@link
SegmentManager#acquireSegment(DataSegment)} for each
- * to obtain a 'reference' to segments in the cache (or loaded from deep
storage if necessary/supported by the
+ * to obtain a 'reference' to segments in the cache (or load from deep
storage if necessary/supported by the
* storage layer).
* <p>
* For each of these segments, we then apply a {@link SegmentMapFunction} to
prepare for processing. The returned
* {@link SegmentReference} MUST BE CLOSED to release the reference.
*/
- protected ArrayList<SegmentReference> getSegmentReferences(
+ protected LoadSegmentsResult getOrLoadSegmentReferences(
List<DataSegmentAndDescriptor> segmentsToMap,
SegmentMapFunction segmentMapFunction,
long timeout
)
{
+ final long startLoadTime = System.nanoTime();
// closer to collect everything that needs cleaned up in the event of
failure, if we make it out of this function,
// closing the segment reference handles everything and it is the callers
responsibility
final Closer safetyNet = Closer.create();
@@ -319,7 +327,7 @@ public class ServerManager implements QuerySegmentWalker
Throwable failure = null;
// getting the future kicks off any background action, so materialize them
all to a list to get things started
- final List<ListenableFuture<ReferenceCountedObjectProvider<Segment>>>
futures = new ArrayList<>(actions.size());
+ final List<ListenableFuture<AcquireSegmentResult>> futures = new
ArrayList<>(actions.size());
for (AcquireSegmentAction acquireSegmentAction : actions) {
// if we haven't failed yet, keep collecting futures
if (failure == null) {
@@ -330,7 +338,7 @@ public class ServerManager implements QuerySegmentWalker
failure = t;
}
} else {
- futures.add(Futures.immediateFuture(Optional::empty));
+ futures.add(Futures.immediateFuture(AcquireSegmentResult.empty()));
}
}
@@ -346,21 +354,30 @@ public class ServerManager implements QuerySegmentWalker
}
final ArrayList<SegmentReference> segmentReferences = new
ArrayList<>(actions.size());
+ long totalSegmentsLoadTime = 0;
+ long totalSegmentsLoadWaitTime = 0;
+ long maxSegmentLoadTime = 0;
+ long maxSegmentWaitTime = 0;
+ long bytesLoaded = 0;
boolean timedOut = false;
boolean interrupted = false;
for (int i = 0; i < actions.size(); i++) {
try {
final DataSegmentAndDescriptor segmentAndDescriptor =
segmentsToMap.get(i);
final AcquireSegmentAction action = actions.get(i);
- final ListenableFuture<ReferenceCountedObjectProvider<Segment>> future
= futures.get(i);
- final ReferenceCountedObjectProvider<Segment> referenceProvider =
- future.get(timeoutAt - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- if (referenceProvider == null) {
+ final ListenableFuture<AcquireSegmentResult> future = futures.get(i);
+ final AcquireSegmentResult result = future.get(timeoutAt -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (result == null) {
segmentReferences.add(
new SegmentReference(segmentAndDescriptor.getDescriptor(),
Optional.empty(), action)
);
} else {
- final Optional<Segment> segment =
referenceProvider.acquireReference();
+ totalSegmentsLoadTime += result.getLoadTimeNanos();
+ totalSegmentsLoadWaitTime += result.getWaitTimeNanos();
+ maxSegmentLoadTime = Math.max(maxSegmentLoadTime,
result.getLoadTimeNanos());
+ maxSegmentWaitTime = Math.max(maxSegmentWaitTime,
result.getWaitTimeNanos());
+ bytesLoaded += result.getLoadSizeBytes();
+ final Optional<Segment> segment =
result.getReferenceProvider().acquireReference();
try {
final Optional<Segment> mappedSegment =
segmentMapFunction.apply(segment).map(safetyNet::register);
segmentReferences.add(
@@ -415,7 +432,18 @@ public class ServerManager implements QuerySegmentWalker
}
throw CloseableUtils.closeInCatch(toThrow, safetyNet);
}
- return segmentReferences;
+ final long loadTime = System.nanoTime() - startLoadTime;
+ final long count = actions.size();
+ return new LoadSegmentsResult(
+ segmentReferences,
+ loadTime,
+ maxSegmentLoadTime,
+ count == 0 ? 0 : totalSegmentsLoadTime / count,
+ maxSegmentWaitTime,
+ count == 0 ? 0 : totalSegmentsLoadWaitTime / count,
+ bytesLoaded,
+ count
+ );
}
protected <T> FunctionalIterable<QueryRunner<T>> getQueryRunnersForSegments(
@@ -588,7 +616,7 @@ public class ServerManager implements QuerySegmentWalker
{
queryPlus = queryPlus.withQuery(
ResourceIdPopulatingQueryRunner.populateResourceId(queryPlus.getQuery())
- );
+ ).withQueryMetrics(toolChest);
final Query<T> query = queryPlus.getQuery();
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
final SegmentMapFunction segmentMapFn =
JvmUtils.safeAccumulateThreadCpuTime(
@@ -606,7 +634,11 @@ public class ServerManager implements QuerySegmentWalker
responseContext.addMissingSegments(segmentsBundle.getMissingSegments());
- final List<SegmentReference> segmentReferences =
getOrLoadBundleSegments(query, segmentsBundle, segmentMapFn);
+ final List<SegmentReference> segmentReferences =
getOrLoadBundleSegments(
+ queryPlus,
+ segmentsBundle,
+ segmentMapFn
+ );
closer.registerAll(segmentReferences);
final FunctionalIterable<QueryRunner<T>> queryRunners =
getQueryRunnersForSegments(
@@ -641,4 +673,55 @@ public class ServerManager implements QuerySegmentWalker
return ServerManager.this.getSegmentsBundle(timeline, specs,
segmentMapFunction);
}
}
+
+ public static class LoadSegmentsResult
+ {
+ private final ArrayList<SegmentReference> segmentReferences;
+ private final long wallTimeNanos;
+ private final long maxTimeNanos;
+ private final long avgTimeNanos;
+ private final long maxWaitNanos;
+ private final long avgWaitNanos;
+ private final long totalBytes;
+ private final long count;
+
+ public LoadSegmentsResult(
+ ArrayList<SegmentReference> segmentReferences,
+ long wallTimeNanos,
+ long maxTimeNanos,
+ long avgTimeNanos,
+ long maxWaitNanos,
+ long avgWaitNanos,
+ long totalBytes,
+ long count
+ )
+ {
+ this.segmentReferences = segmentReferences;
+ this.wallTimeNanos = wallTimeNanos;
+ this.maxTimeNanos = maxTimeNanos;
+ this.avgTimeNanos = avgTimeNanos;
+ this.maxWaitNanos = maxWaitNanos;
+ this.avgWaitNanos = avgWaitNanos;
+ this.totalBytes = totalBytes;
+ this.count = count;
+ }
+
+ public List<SegmentReference> getSegmentReferences()
+ {
+ return segmentReferences;
+ }
+
+ public void reportMetrics(@Nullable QueryMetrics<?> queryMetrics)
+ {
+ if (queryMetrics != null) {
+ queryMetrics.reportSegmentOnDemandLoadTime(wallTimeNanos);
+ queryMetrics.reportSegmentOnDemandLoadTimeMax(maxTimeNanos);
+ queryMetrics.reportSegmentOnDemandLoadTimeAvg(avgTimeNanos);
+ queryMetrics.reportSegmentOnDemandLoadWaitTimeMax(maxWaitNanos);
+ queryMetrics.reportSegmentOnDemandLoadWaitTimeAvg(avgWaitNanos);
+ queryMetrics.reportSegmentOnDemandLoadBytes(totalBytes);
+ queryMetrics.reportSegmentOnDemandLoadCount(count);
+ }
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
index 3e5af5c8cfb..9ee49251f11 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/NoopSegmentCacheManager.java
@@ -39,6 +39,18 @@ public class NoopSegmentCacheManager implements
SegmentCacheManager
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean canLoadSegmentsOnDemand()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean canLoadSegmentOnDemand(DataSegment segment)
+ {
+ return false;
+ }
+
@Override
public List<DataSegment> getCachedSegments()
{
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index e6da9b2e6cf..4af946982f3 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.PhysicalSegmentInspector;
-import org.apache.druid.segment.ReferenceCountedObjectProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
@@ -856,12 +855,12 @@ class SegmentLocalCacheManagerConcurrencyTest
segmentManager.acquireSegment(segment)
);
try {
- final ReferenceCountedObjectProvider<Segment> referenceProvider =
+ final AcquireSegmentResult result =
action.getSegmentFuture().get(timeout, TimeUnit.MILLISECONDS);
- if (referenceProvider == null) {
+ if (result == null) {
Assertions.fail("this shouldn't happen");
}
- final Optional<Segment> segment =
referenceProvider.acquireReference().map(closer::register);
+ final Optional<Segment> segment =
result.getReferenceProvider().acquireReference().map(closer::register);
if (segment.isPresent()) {
PhysicalSegmentInspector gadget =
segment.get().as(PhysicalSegmentInspector.class);
if (delayMin >= 0 && delayMax > 0) {
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 290de1934f2..2e7146b5f47 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -37,7 +37,6 @@ import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.ReferenceCountedObjectProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestHelper;
@@ -921,8 +920,8 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
Assert.assertNull(manager.getSegmentFiles(segmentToLoad));
Assert.assertFalse(manager.acquireCachedSegment(segmentToLoad).isPresent());
AcquireSegmentAction segmentAction = manager.acquireSegment(segmentToLoad);
- ReferenceCountedObjectProvider<Segment> referenceProvider =
segmentAction.getSegmentFuture().get();
- Optional<Segment> theSegment = referenceProvider.acquireReference();
+ AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
+ Optional<Segment> theSegment =
result.getReferenceProvider().acquireReference();
Assert.assertTrue(theSegment.isPresent());
Assert.assertNotNull(manager.getSegmentFiles(segmentToLoad));
Assert.assertEquals(segmentToLoad.getId(), theSegment.get().getId());
@@ -936,8 +935,8 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
// can actually load them again because load doesn't really do anything
AcquireSegmentAction segmentActionAfterDrop =
manager.acquireSegment(segmentToLoad);
- ReferenceCountedObjectProvider<Segment> referenceProviderAfterDrop =
segmentActionAfterDrop.getSegmentFuture().get();
- Optional<Segment> theSegmentAfterDrop =
referenceProviderAfterDrop.acquireReference();
+ AcquireSegmentResult resultAfterDrop =
segmentActionAfterDrop.getSegmentFuture().get();
+ Optional<Segment> theSegmentAfterDrop =
resultAfterDrop.getReferenceProvider().acquireReference();
Assert.assertTrue(theSegmentAfterDrop.isPresent());
Assert.assertNotNull(manager.getSegmentFiles(segmentToLoad));
Assert.assertEquals(segmentToLoad.getId(),
theSegmentAfterDrop.get().getId());
@@ -991,8 +990,8 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
Assert.assertNull(manager.getSegmentFiles(segmentToBootstrap));
Assert.assertFalse(manager.acquireCachedSegment(segmentToBootstrap).isPresent());
AcquireSegmentAction segmentAction =
manager.acquireSegment(segmentToBootstrap);
- ReferenceCountedObjectProvider<Segment> referenceProvider =
segmentAction.getSegmentFuture().get();
- Optional<Segment> theSegment = referenceProvider.acquireReference();
+ AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
+ Optional<Segment> theSegment =
result.getReferenceProvider().acquireReference();
Assert.assertTrue(theSegment.isPresent());
Assert.assertNotNull(manager.getSegmentFiles(segmentToBootstrap));
Assert.assertEquals(segmentToBootstrap.getId(), theSegment.get().getId());
@@ -1008,8 +1007,8 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
// can actually load them again because bootstrap doesn't really do
anything unless the segment is already
// present in the cache
AcquireSegmentAction segmentActionAfterDrop =
manager.acquireSegment(segmentToBootstrap);
- ReferenceCountedObjectProvider<Segment> referenceProviderDrop =
segmentActionAfterDrop.getSegmentFuture().get();
- Optional<Segment> theSegmentAfterDrop =
referenceProviderDrop.acquireReference();
+ AcquireSegmentResult resultAfterDrop =
segmentActionAfterDrop.getSegmentFuture().get();
+ Optional<Segment> theSegmentAfterDrop =
resultAfterDrop.getReferenceProvider().acquireReference();
Assert.assertTrue(theSegmentAfterDrop.isPresent());
Assert.assertNotNull(manager.getSegmentFiles(segmentToBootstrap));
Assert.assertEquals(segmentToBootstrap.getId(),
theSegmentAfterDrop.get().getId());
@@ -1088,8 +1087,8 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
// reference which was originally cached and then dropped before
attempting to acquire a segment. if virtual storage
// is not enabled, this should return a missing segment instead of
downloading
AcquireSegmentAction segmentAction = manager.acquireSegment(segmentToLoad);
- ReferenceCountedObjectProvider<Segment> referenceProvider =
segmentAction.getSegmentFuture().get();
- Optional<Segment> theSegment = referenceProvider.acquireReference();
+ AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
+ Optional<Segment> theSegment =
result.getReferenceProvider().acquireReference();
Assert.assertFalse(theSegment.isPresent());
segmentAction.close();
@@ -1151,9 +1150,9 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
Assert.assertThrows(DruidException.class, () ->
manager.acquireSegment(cannotLoad));
// and we can still mount and use the segment we are holding
- ReferenceCountedObjectProvider<Segment> referenceProvider =
segmentAction.getSegmentFuture().get();
- Assert.assertNotNull(referenceProvider);
- Optional<Segment> theSegment = referenceProvider.acquireReference();
+ AcquireSegmentResult result = segmentAction.getSegmentFuture().get();
+ Assert.assertNotNull(result);
+ Optional<Segment> theSegment =
result.getReferenceProvider().acquireReference();
Assert.assertTrue(theSegment.isPresent());
Assert.assertNotNull(manager.getSegmentFiles(segmentToLoad));
Assert.assertEquals(segmentToLoad.getId(), theSegment.get().getId());
@@ -1166,8 +1165,8 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
createSegmentZipInLocation(segmentDeepStorageDir,
TEST_DATA_RELATIVE_PATH_2);
manager.load(cannotLoad);
AcquireSegmentAction segmentActionAfterDrop =
manager.acquireSegment(cannotLoad);
- ReferenceCountedObjectProvider<Segment> referenceProviderDrop =
segmentActionAfterDrop.getSegmentFuture().get();
- Optional<Segment> theSegmentAfterDrop =
referenceProviderDrop.acquireReference();
+ AcquireSegmentResult resultDrop =
segmentActionAfterDrop.getSegmentFuture().get();
+ Optional<Segment> theSegmentAfterDrop =
resultDrop.getReferenceProvider().acquireReference();
Assert.assertTrue(theSegmentAfterDrop.isPresent());
Assert.assertNotNull(manager.getSegmentFiles(cannotLoad));
Assert.assertEquals(cannotLoad.getId(), theSegmentAfterDrop.get().getId());
diff --git
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 8f8c8d37e3b..3e05eb04396 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -35,14 +35,13 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.ReferenceCountedObjectProvider;
-import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.TestSegmentUtils;
import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
import
org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
@@ -475,8 +474,10 @@ public class SegmentManagerTest extends
InitializedNullHandlingTest
);
final AcquireSegmentAction action =
virtualSegmentManager.acquireSegment(toLoad);
- ReferenceCountedObjectProvider<Segment> segmentProvider =
action.getSegmentFuture().get();
- Assert.assertNotNull(segmentProvider);
+ AcquireSegmentResult result = action.getSegmentFuture().get();
+ Assert.assertNotNull(result);
+ Assert.assertEquals(1L, result.getLoadSizeBytes());
+ Assert.assertTrue(result.getLoadTimeNanos() > 0);
DataSegmentAndDescriptor d1 = new
DataSegmentAndDescriptor(SEGMENTS.get(0), SEGMENTS.get(0).toDescriptor());
DataSegmentAndDescriptor d2 = new DataSegmentAndDescriptor(toLoad,
toLoad.toDescriptor());
diff --git
a/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
b/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
index 7ae15a6712a..5e550f8095f 100644
---
a/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
+++
b/server/src/test/java/org/apache/druid/test/utils/TestSegmentCacheManager.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestSegmentUtils;
import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
import org.apache.druid.timeline.DataSegment;
@@ -138,7 +139,7 @@ public class TestSegmentCacheManager extends
NoopSegmentCacheManager
return AcquireSegmentAction.missingSegment();
}
return new AcquireSegmentAction(
- () -> Futures.immediateFuture(getSegmentInternal(dataSegment)),
+ () ->
Futures.immediateFuture(AcquireSegmentResult.cached(getSegmentInternal(dataSegment))),
null
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]