This is an automated email from the ASF dual-hosted git repository.
jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 111e7338113 feat: add realtimeSegmentsMode query context param (#19486)
111e7338113 is described below
commit 111e7338113220bc62987062c1c40a55f9365703
Author: jtuglu1 <[email protected]>
AuthorDate: Wed May 20 18:12:12 2026 -0700
feat: add realtimeSegmentsMode query context param (#19486)
This adds the query context parameter realtimeSegmentsMode and deprecates
realtimeSegmentsOnly. realtimeSegmentsOnly=true maps to
realtimeSegmentsOnly=exclusive and realtimeSegmentsOnly=false maps to
realtimeSegmentsOnly=include.
This is useful when performing things like blue/green deployments and you
only want to query new historical replica ASGs and not touch any "live" nodes
(neither realtime nor historical).
---
docs/querying/query-context-reference.md | 3 +-
.../java/org/apache/druid/query/QueryContext.java | 43 ++++++++++-
.../java/org/apache/druid/query/QueryContexts.java | 42 +++++++++++
.../org/apache/druid/query/QueryContextTest.java | 72 +++++++++++++++++++
.../druid/client/CachingClusteredClient.java | 18 ++++-
.../druid/client/CachingClusteredClientTest.java | 83 +++++++++++++++++++---
.../query-context-completions.ts | 12 +++-
7 files changed, 258 insertions(+), 15 deletions(-)
diff --git a/docs/querying/query-context-reference.md
b/docs/querying/query-context-reference.md
index 511e8b13b69..c485c0231c0 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -71,7 +71,8 @@ Unless otherwise noted, the following parameters apply to all
query types, and t
|`setProcessingThreadNames`|`true`| Whether processing thread names will be
set to `queryType_dataSource_intervals` while processing a query. This aids in
interpreting thread dumps, and is on by default. Query overhead can be reduced
slightly by setting this to `false`. This has a tiny effect in most scenarios,
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge
two Project operators when inlining expressions causes complexity to increase.
Implemented as a workaround to exception `There are not enough rules to produce
a node with desired properties: convention=DRUID, sort=[]` thrown after
rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should
be queried by brokers. Clone servers are created by the `cloneServers`
Coordinator dynamic configuration. Possible values are `excludeClones`,
`includeClones` and `preferClones`. `excludeClones` means that clone
Historicals are not queried by the broker. `preferClones` indicates that when
given a choice between the clone Historical and the original Historical which
is being cloned, the broker chooses the clones [...]
-|`realtimeSegmentsOnly` |`false`| When set to true, only query realtime
segments. Historical segments are excluded. |
+|`realtimeSegmentsMode` |`include`| Controls whether realtime segments are
queried. `include` queries all segments, including realtime. `exclude` skips
realtime segments. `exclusive` queries only realtime segments. |
+|`realtimeSegmentsOnly` |`false`| **Deprecated.** Use
`realtimeSegmentsMode=exclusive` instead. When set to `true`, this is
equivalent to `realtimeSegmentsMode=exclusive`. When set to `false`, this is
equivalent to `realtimeSegmentsMode=include`.|
## Parameters by query type
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 39f325d1089..c2929626002 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.TypedInFilter;
@@ -781,8 +782,48 @@ public class QueryContext
return getBoolean(QueryContexts.CTX_PREPLANNED,
QueryContexts.DEFAULT_PREPLANNED);
}
+ /**
+ * Returns the realtime segments mode for this query. If {@link
QueryContexts#REALTIME_SEGMENTS_MODE} is absent
+ * or null, falls back to the deprecated {@code realtimeSegmentsOnly}
boolean: {@code true} maps
+ * to {@link RealtimeSegmentsMode#EXCLUSIVE}; otherwise returns {@link
RealtimeSegmentsMode#INCLUDE}.
+ * Throws {@link BadQueryContextException} if both fields are set
simultaneously.
+ */
+ public RealtimeSegmentsMode getRealtimeSegmentsMode()
+ {
+ RealtimeSegmentsMode mode = getEnum(
+ QueryContexts.REALTIME_SEGMENTS_MODE,
+ RealtimeSegmentsMode.class,
+ null
+ );
+ boolean hasDeprecatedFlag = get(QueryContexts.REALTIME_SEGMENTS_ONLY) !=
null;
+ if (mode != null && hasDeprecatedFlag) {
+ throw new BadQueryContextException(
+ StringUtils.format(
+ "Cannot set both [%s] and deprecated [%s]; use [%s] only.",
+ QueryContexts.REALTIME_SEGMENTS_MODE,
+ QueryContexts.REALTIME_SEGMENTS_ONLY,
+ QueryContexts.REALTIME_SEGMENTS_MODE
+ )
+ );
+ }
+ if (mode != null) {
+ return mode;
+ }
+ if (hasDeprecatedFlag) {
+ // Backward-compat: honour the deprecated realtimeSegmentsOnly flag.
+ return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY,
QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY)
+ ? RealtimeSegmentsMode.EXCLUSIVE
+ : QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
+ }
+ return QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
+ }
+
+ /**
+ * @deprecated Use {@link #getRealtimeSegmentsMode()} instead.
+ */
+ @Deprecated
public boolean isRealtimeSegmentsOnly()
{
- return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY,
QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY);
+ return getRealtimeSegmentsMode() == RealtimeSegmentsMode.EXCLUSIVE;
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 1cb8aa24cf4..44dffc9a427 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -146,9 +146,20 @@ public class QueryContexts
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED =
"COUPLED";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED =
"DECOUPLED";
+ /**
+ * @deprecated Use {@link #REALTIME_SEGMENTS_MODE} instead.
+ */
+ @Deprecated
public static final String REALTIME_SEGMENTS_ONLY = "realtimeSegmentsOnly";
+ /**
+ * @deprecated Use {@link #DEFAULT_REALTIME_SEGMENTS_MODE} instead.
+ */
+ @Deprecated
public static final boolean DEFAULT_REALTIME_SEGMENTS_ONLY = false;
+ public static final String REALTIME_SEGMENTS_MODE = "realtimeSegmentsMode";
+ public static final RealtimeSegmentsMode DEFAULT_REALTIME_SEGMENTS_MODE =
RealtimeSegmentsMode.INCLUDE;
+
public static final String CTX_PREPLANNED = "prePlanned";
public static final boolean DEFAULT_PREPLANNED = true;
@@ -233,6 +244,37 @@ public class QueryContexts
}
}
+ /**
+ * Classifies segments by whether a historical replica exists
+ * (see {@link
org.apache.druid.client.selector.ServerSelector#isRealtimeSegment()}: a segment
is
+ * "realtime" only when it has realtime servers and zero historical servers).
+ */
+ public enum RealtimeSegmentsMode
+ {
+ /** Query all segments, including realtime (default). */
+ INCLUDE,
+ /** Query only realtime segments. */
+ EXCLUSIVE,
+ /** Skip realtime segments; query only historical. */
+ EXCLUDE;
+
+ @JsonCreator
+ public static RealtimeSegmentsMode fromString(String str)
+ {
+ if (str == null) {
+ return null;
+ }
+ return RealtimeSegmentsMode.valueOf(StringUtils.toUpperCase(str));
+ }
+
+ @Override
+ @JsonValue
+ public String toString()
+ {
+ return StringUtils.toLowerCase(name());
+ }
+ }
+
private QueryContexts()
{
}
diff --git
a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
index 0cb931e50d2..d5550bc28dc 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryContextTest.java
@@ -430,6 +430,7 @@ public class QueryContextTest
}
@Test
+ @SuppressWarnings("deprecation")
public void testIsRealtimeSegmentsOnly()
{
assertFalse(QueryContext.empty().isRealtimeSegmentsOnly());
@@ -440,6 +441,77 @@ public class QueryContextTest
);
}
+ @Test
+ public void testGetRealtimeSegmentsMode()
+ {
+ assertEquals(
+ QueryContexts.RealtimeSegmentsMode.INCLUDE,
+ QueryContext.empty().getRealtimeSegmentsMode()
+ );
+ assertEquals(
+ QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
+ QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"exclusive"))
+ .getRealtimeSegmentsMode()
+ );
+ assertEquals(
+ QueryContexts.RealtimeSegmentsMode.EXCLUDE,
+ QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"exclude"))
+ .getRealtimeSegmentsMode()
+ );
+ assertEquals(
+ QueryContexts.RealtimeSegmentsMode.INCLUDE,
+ QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"include"))
+ .getRealtimeSegmentsMode()
+ );
+ }
+
+ @Test
+ public void testGetRealtimeSegmentsModeBackwardCompat()
+ {
+ // realtimeSegmentsOnly=true maps to EXCLUSIVE
+ assertEquals(
+ QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
+ QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY,
true))
+ .getRealtimeSegmentsMode()
+ );
+ // realtimeSegmentsOnly=false maps to INCLUDE (default)
+ assertEquals(
+ QueryContexts.RealtimeSegmentsMode.INCLUDE,
+ QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY,
false))
+ .getRealtimeSegmentsMode()
+ );
+ }
+
+ @Test
+ public void testGetRealtimeSegmentsModeConflictThrows()
+ {
+ BadQueryContextException e = assertThrows(
+ BadQueryContextException.class,
+ () -> QueryContext.of(ImmutableMap.of(
+ QueryContexts.REALTIME_SEGMENTS_ONLY, true,
+ QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"
+ )).getRealtimeSegmentsMode()
+ );
+ assertEquals(
+ "Cannot set both [realtimeSegmentsMode] and deprecated
[realtimeSegmentsOnly]; use [realtimeSegmentsMode] only.",
+ e.getMessage()
+ );
+ }
+
+ @Test
+ public void testGetRealtimeSegmentsModeInvalidValue()
+ {
+ BadQueryContextException e = assertThrows(
+ BadQueryContextException.class,
+ () ->
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"badvalue"))
+ .getRealtimeSegmentsMode()
+ );
+ assertEquals(
+ "Expected key [realtimeSegmentsMode] to be referring to one of the
values [INCLUDE,EXCLUSIVE,EXCLUDE] of enum [RealtimeSegmentsMode], but got
[badvalue]",
+ e.getMessage()
+ );
+ }
+
@Test
public void testSerialization() throws Exception
{
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index eb0a5a83997..9305b1b88e9 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -61,6 +61,7 @@ import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -444,7 +445,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
final SegmentPruner segmentPruner = ev.getSegmentPruner();
- boolean isRealtimeSegmentOnly = query.context().isRealtimeSegmentsOnly();
+ RealtimeSegmentsMode realtimeSegmentsMode =
query.context().getRealtimeSegmentsMode();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder :
serversLookup) {
final Collection<PartitionChunk<ServerSelector>> filteredChunks;
@@ -458,8 +459,19 @@ public class CachingClusteredClient implements
QuerySegmentWalker
}
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
- if (isRealtimeSegmentOnly && !server.isRealtimeSegment()) {
- continue; // Skip historical segments when only realtime segments
are requested
+ switch (realtimeSegmentsMode) {
+ case EXCLUSIVE:
+ if (!server.isRealtimeSegment()) {
+ continue;
+ }
+ break;
+ case EXCLUDE:
+ if (server.isRealtimeSegment()) {
+ continue;
+ }
+ break;
+ case INCLUDE:
+ break;
}
final SegmentDescriptor segment = new SegmentDescriptor(
holder.getInterval(),
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 4f76b7b52c1..94c259314de 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -3132,27 +3132,94 @@ public class CachingClusteredClientTest
selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0],
null), dataSegment);
timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector));
- final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
+ // include (default): historical segment is included
+ final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
- .context(ImmutableMap.of("realtimeSegmentsOnly", false))
+ .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"include"))
.randomQueryId()
.build();
- final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder()
+ // exclusive: only realtime segments — historical segment is excluded
+ final TimeBoundaryQuery queryExclusive =
Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(DATA_SOURCE)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+ .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"exclusive"))
+ .randomQueryId()
+ .build();
+
+ // backward compat: realtimeSegmentsOnly=true maps to EXCLUSIVE
+ final TimeBoundaryQuery queryLegacyTrue =
Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
- .context(ImmutableMap.of("realtimeSegmentsOnly", true))
+ .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY,
true))
.randomQueryId()
.build();
final ResponseContext responseContext = initializeResponseContext();
- getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
- getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext);
+ getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
+ getDefaultQueryRunner().run(QueryPlus.wrap(queryExclusive),
responseContext);
+ getDefaultQueryRunner().run(QueryPlus.wrap(queryLegacyTrue),
responseContext);
+
+ final Map<String, Integer> remainingResponseMap = (Map<String, Integer>)
responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
+ Assert.assertEquals(1,
remainingResponseMap.get(queryInclude.getId()).intValue());
+ Assert.assertEquals(0,
remainingResponseMap.get(queryExclusive.getId()).intValue());
+ Assert.assertEquals(0,
remainingResponseMap.get(queryLegacyTrue.getId()).intValue());
+ }
+
+ @Test
+ public void testRealtimeSegmentsModeExclude()
+ {
+ final Interval interval = Intervals.of("2016-01-01/2016-01-02");
+ final Interval queryInterval =
Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00");
+ final DataSegment dataSegment = new DataSegment(
+ "dataSource",
+ interval,
+ "ver",
+ ImmutableMap.of("type", "hdfs", "path", "/tmp"),
+ ImmutableList.of("product"),
+ ImmutableList.of("visited_sum"),
+ NoneShardSpec.instance(),
+ 9,
+ 12334
+ );
+
+ // selector backed only by a realtime server — isRealtimeSegment() == true
+ final DruidServer realtimeServer = new DruidServer(
+ "rt1", "rt1", null, 10, null, ServerType.REALTIME,
DruidServer.DEFAULT_TIER, 0
+ );
+ final ServerSelector realtimeSelector = new ServerSelector(
+ dataSegment,
+ new HighestPriorityTierSelectorStrategy(new
RandomServerSelectorStrategy()),
+ HistoricalFilter.IDENTITY_FILTER
+ );
+ realtimeSelector.addServerAndUpdateSegment(new
QueryableDruidServer(realtimeServer, null), dataSegment);
+ timeline.add(interval, "ver", new
SingleElementPartitionChunk<>(realtimeSelector));
+
+ // exclude: realtime-only segment is skipped
+ final TimeBoundaryQuery queryExclude = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(DATA_SOURCE)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+ .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"exclude"))
+ .randomQueryId()
+ .build();
+
+ // include: realtime-only segment is included
+ final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(DATA_SOURCE)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+ .context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE,
"include"))
+ .randomQueryId()
+ .build();
+
+ final ResponseContext responseContext = initializeResponseContext();
+ getDefaultQueryRunner().run(QueryPlus.wrap(queryExclude), responseContext);
+ getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
+
final Map<String, Integer> remainingResponseMap = (Map<String, Integer>)
responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
- Assert.assertEquals(1, remainingResponseMap.get(query.getId()).intValue());
- Assert.assertEquals(0,
remainingResponseMap.get(query2.getId()).intValue());
+ Assert.assertEquals(0,
remainingResponseMap.get(queryExclude.getId()).intValue());
+ Assert.assertEquals(1,
remainingResponseMap.get(queryInclude.getId()).intValue());
}
@SuppressWarnings("unchecked")
diff --git
a/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
b/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
index 9e3cb8dca8d..d7e67b7f492 100644
--- a/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
+++ b/web-console/src/dialogs/edit-context-dialog/query-context-completions.ts
@@ -73,8 +73,8 @@ export const QUERY_CONTEXT_COMPLETIONS: JsonCompletionRule[]
= [
documentation: 'Enable vectorized query execution',
},
{
- value: 'realtimeSegmentsOnly',
- documentation: 'Whether to query only realtime segments',
+ value: 'realtimeSegmentsMode',
+ documentation: 'Controls whether realtime segments are queried',
},
{
value: 'vectorSize',
@@ -152,4 +152,12 @@ export const QUERY_CONTEXT_COMPLETIONS:
JsonCompletionRule[] = [
{ value: 'force', documentation: 'Force vectorized execution' },
],
},
+ {
+ path: '$.realtimeSegmentsMode',
+ completions: [
+ { value: 'include', documentation: 'Query all segments, including
realtime (default)' },
+ { value: 'exclude', documentation: 'Skip realtime segments; query only
historical' },
+ { value: 'exclusive', documentation: 'Query only realtime segments' },
+ ],
+ },
];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]